[jira] [Updated] (SPARK-13482) `spark.storage.memoryMapThreshold` has two kind of the value.

2016-02-24 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-13482:
-
Description: 
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
*TransportConf#memoryMapBytes*.
Useage of the `spark.storage.memoryMapThreshold`:
!https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!


  was:
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
"TransportConf#memoryMapBytes".
Useage of the `spark.storage.memoryMapThreshold`:
!https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!



> `spark.storage.memoryMapThreshold` has two kind of the value.
> -
>
> Key: SPARK-13482
> URL: https://issues.apache.org/jira/browse/SPARK-13482
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.5.1, 1.6.0, 1.6.1, 2.0.0
>Reporter: SaintBacchus
> Attachments: 2016-02-25_10-41-37.jpg
>
>
> `spark.storage.memoryMapThreshold` has two kind of the value, one is 
> 2*1024*1024 as integer and the other one is '2m' as string.
> "2m" is recommanded in document but it will go wrong if the code goes into 
> *TransportConf#memoryMapBytes*.
> Useage of the `spark.storage.memoryMapThreshold`:
> !https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13482) `spark.storage.memoryMapThreshold` has two kind of the value.

2016-02-24 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-13482:
-
Description: 
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
"TransportConf#memoryMapBytes".
!https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!


  was:
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
"TransportConf#memoryMapBytes".
!!



> `spark.storage.memoryMapThreshold` has two kind of the value.
> -
>
> Key: SPARK-13482
> URL: https://issues.apache.org/jira/browse/SPARK-13482
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.5.1, 1.6.0, 1.6.1, 2.0.0
>Reporter: SaintBacchus
> Attachments: 2016-02-25_10-41-37.jpg
>
>
> `spark.storage.memoryMapThreshold` has two kind of the value, one is 
> 2*1024*1024 as integer and the other one is '2m' as string.
> "2m" is recommanded in document but it will go wrong if the code goes into 
> "TransportConf#memoryMapBytes".
> !https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13482) `spark.storage.memoryMapThreshold` has two kind of the value.

2016-02-24 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-13482:
-
Description: 
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
"TransportConf#memoryMapBytes".
Useage of the `spark.storage.memoryMapThreshold`:
!https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!


  was:
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
"TransportConf#memoryMapBytes".
!https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!



> `spark.storage.memoryMapThreshold` has two kind of the value.
> -
>
> Key: SPARK-13482
> URL: https://issues.apache.org/jira/browse/SPARK-13482
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.5.1, 1.6.0, 1.6.1, 2.0.0
>Reporter: SaintBacchus
> Attachments: 2016-02-25_10-41-37.jpg
>
>
> `spark.storage.memoryMapThreshold` has two kind of the value, one is 
> 2*1024*1024 as integer and the other one is '2m' as string.
> "2m" is recommanded in document but it will go wrong if the code goes into 
> "TransportConf#memoryMapBytes".
> Useage of the `spark.storage.memoryMapThreshold`:
> !https://issues.apache.org/jira/secure/attachment/12789859/2016-02-25_10-41-37.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13482) `spark.storage.memoryMapThreshold` has two kind of the value.

2016-02-24 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-13482:
-
Description: 
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
"TransportConf#memoryMapBytes".
!!


  was:
`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
TransportConf#memoryMapBytes


> `spark.storage.memoryMapThreshold` has two kind of the value.
> -
>
> Key: SPARK-13482
> URL: https://issues.apache.org/jira/browse/SPARK-13482
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.5.1, 1.6.0, 1.6.1, 2.0.0
>Reporter: SaintBacchus
> Attachments: 2016-02-25_10-41-37.jpg
>
>
> `spark.storage.memoryMapThreshold` has two kind of the value, one is 
> 2*1024*1024 as integer and the other one is '2m' as string.
> "2m" is recommanded in document but it will go wrong if the code goes into 
> "TransportConf#memoryMapBytes".
> !!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13482) `spark.storage.memoryMapThreshold` has two kind of the value.

2016-02-24 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-13482:
-
Attachment: 2016-02-25_10-41-37.jpg

> `spark.storage.memoryMapThreshold` has two kind of the value.
> -
>
> Key: SPARK-13482
> URL: https://issues.apache.org/jira/browse/SPARK-13482
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 1.5.1, 1.6.0, 1.6.1, 2.0.0
>Reporter: SaintBacchus
> Attachments: 2016-02-25_10-41-37.jpg
>
>
> `spark.storage.memoryMapThreshold` has two kind of the value, one is 
> 2*1024*1024 as integer and the other one is '2m' as string.
> "2m" is recommanded in document but it will go wrong if the code goes into 
> TransportConf#memoryMapBytes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13482) `spark.storage.memoryMapThreshold` has two kind of the value.

2016-02-24 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-13482:


 Summary: `spark.storage.memoryMapThreshold` has two kind of the 
value.
 Key: SPARK-13482
 URL: https://issues.apache.org/jira/browse/SPARK-13482
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.6.0, 1.5.1, 1.6.1, 2.0.0
Reporter: SaintBacchus


`spark.storage.memoryMapThreshold` has two kind of the value, one is 
2*1024*1024 as integer and the other one is '2m' as string.
"2m" is recommanded in document but it will go wrong if the code goes into 
TransportConf#memoryMapBytes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12316) Stack overflow with endless call of `Delegation token thread` when application end.

2016-02-16 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15149810#comment-15149810
 ] 

SaintBacchus commented on SPARK-12316:
--

[~tgraves] The function of listFilesSorted will not throw the exception, it 
only log the exception. So it will not schedule it an hour later and it will 
schedule it immediately and then go into another loop.

> Stack overflow with endless call of `Delegation token thread` when 
> application end.
> ---
>
> Key: SPARK-12316
> URL: https://issues.apache.org/jira/browse/SPARK-12316
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
>Assignee: SaintBacchus
> Attachments: 20151210045149.jpg, 20151210045533.jpg
>
>
> When application end, AM will clean the staging dir.
> But if the driver trigger to update the delegation token, it will can't find 
> the right token file and then it will endless cycle call the method 
> 'updateCredentialsIfRequired'.
> Then it lead to StackOverflowError.
> !https://issues.apache.org/jira/secure/attachment/12779495/20151210045149.jpg!
> !https://issues.apache.org/jira/secure/attachment/12779496/20151210045533.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12316) Stack overflow with endless call of `Delegation token thread` when application end.

2016-02-15 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15148023#comment-15148023
 ] 

SaintBacchus commented on SPARK-12316:
--

[~tgraves] The application would not hit the same condition because one minute 
later all the  non-deamon threads had exited.
[~hshreedharan] Driver and ApplicationMaster will both try to delete the 
staging directory. If we want to make sure the ExecutorDelegationTokenUpdater 
stopped before the ApplicationMaster had exited, it will have to add some RPC 
call between these threads. So I think add a try after one minuter may be an 
easy to avoid this issue.

> Stack overflow with endless call of `Delegation token thread` when 
> application end.
> ---
>
> Key: SPARK-12316
> URL: https://issues.apache.org/jira/browse/SPARK-12316
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
>Assignee: SaintBacchus
> Attachments: 20151210045149.jpg, 20151210045533.jpg
>
>
> When application end, AM will clean the staging dir.
> But if the driver trigger to update the delegation token, it will can't find 
> the right token file and then it will endless cycle call the method 
> 'updateCredentialsIfRequired'.
> Then it lead to StackOverflowError.
> !https://issues.apache.org/jira/secure/attachment/12779495/20151210045149.jpg!
> !https://issues.apache.org/jira/secure/attachment/12779496/20151210045533.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12316) Stack overflow with endless call of `Delegation token thread` when application end.

2016-01-07 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-12316:
-
Description: 
When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'.
Then it lead to StackOverflowError.
!https://issues.apache.org/jira/secure/attachment/12779495/20151210045149.jpg!
!https://issues.apache.org/jira/secure/attachment/12779496/20151210045533.jpg!

  was:
When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'.
Then it lead to StackOverflowError.
!https://issues.apache.org/jira/secure/attachment/12779495/20151210045149.jpg!
!


> Stack overflow with endless call of `Delegation token thread` when 
> application end.
> ---
>
> Key: SPARK-12316
> URL: https://issues.apache.org/jira/browse/SPARK-12316
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
> Attachments: 20151210045149.jpg, 20151210045533.jpg
>
>
> When application end, AM will clean the staging dir.
> But if the driver trigger to update the delegation token, it will can't find 
> the right token file and then it will endless cycle call the method 
> 'updateCredentialsIfRequired'.
> Then it lead to StackOverflowError.
> !https://issues.apache.org/jira/secure/attachment/12779495/20151210045149.jpg!
> !https://issues.apache.org/jira/secure/attachment/12779496/20151210045533.jpg!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12672) Streaming batch ui can't be opened in jobs page in yarn mode.

2016-01-06 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-12672:


 Summary: Streaming batch ui can't be opened in jobs page in yarn 
mode.
 Key: SPARK-12672
 URL: https://issues.apache.org/jira/browse/SPARK-12672
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.6.0, 1.6.1, 2.0.0
Reporter: SaintBacchus


Streaming batch ui can't be opened in jobs page since it did not use the proxy 
url of the yarn mode. The wrong url and the right url you can see in the 
picture.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12672) Streaming batch ui can't be opened in jobs page in yarn mode.

2016-01-06 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-12672:
-
Attachment: 2016-1-6 15-57-05.jpg
2016-1-6 15-56-26.jpg

> Streaming batch ui can't be opened in jobs page in yarn mode.
> -
>
> Key: SPARK-12672
> URL: https://issues.apache.org/jira/browse/SPARK-12672
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.0, 1.6.1, 2.0.0
>Reporter: SaintBacchus
> Attachments: 2016-1-6 15-56-26.jpg, 2016-1-6 15-57-05.jpg
>
>
> Streaming batch ui can't be opened in jobs page since it did not use the 
> proxy url of the yarn mode. The wrong url and the right url you can see in 
> the picture.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12523) Support long-running of the Spark On HBase and hive meta store.

2015-12-25 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-12523:


 Summary: Support long-running of the Spark On HBase and hive meta 
store.
 Key: SPARK-12523
 URL: https://issues.apache.org/jira/browse/SPARK-12523
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 2.0.0
Reporter: SaintBacchus


**AMDelegationTokenRenewer** now only obtain the HDFS token in AM, if we want 
to use long-running Spark on HBase or hive meta store, we should obtain  the 
these token as also.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12316) Stack overflow with endless call of `Delegation token thread` when application end.

2015-12-24 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-12316:
-
Attachment: 20151210045533.jpg
20151210045149.jpg

> Stack overflow with endless call of `Delegation token thread` when 
> application end.
> ---
>
> Key: SPARK-12316
> URL: https://issues.apache.org/jira/browse/SPARK-12316
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
> Attachments: 20151210045149.jpg, 20151210045533.jpg
>
>
> When application end, AM will clean the staging dir.
> But if the driver trigger to update the delegation token, it will can't find 
> the right token file and then it will endless cycle call the method 
> 'updateCredentialsIfRequired'.
> Then it lead to StackOverflowError.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12316) Stack overflow with endless call of `Delegation token thread` when application end.

2015-12-13 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-12316:


 Summary: Stack overflow with endless call of `Delegation token 
thread` when application end.
 Key: SPARK-12316
 URL: https://issues.apache.org/jira/browse/SPARK-12316
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.0
Reporter: SaintBacchus


When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-12316) Stack overflow with endless call of `Delegation token thread` when application end.

2015-12-13 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-12316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-12316:
-
Description: 
When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'.
Then it lead to StackOverflowError.

  was:
When application end, AM will clean the staging dir.
But if the driver trigger to update the delegation token, it will can't find 
the right token file and then it will endless cycle call the method 
'updateCredentialsIfRequired'


> Stack overflow with endless call of `Delegation token thread` when 
> application end.
> ---
>
> Key: SPARK-12316
> URL: https://issues.apache.org/jira/browse/SPARK-12316
> Project: Spark
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
>
> When application end, AM will clean the staging dir.
> But if the driver trigger to update the delegation token, it will can't find 
> the right token file and then it will endless cycle call the method 
> 'updateCredentialsIfRequired'.
> Then it lead to StackOverflowError.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-11043) Hive Thrift Server will log warn "Couldn't find log associated with operation handle"

2015-10-09 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-11043:


 Summary: Hive Thrift Server will log warn "Couldn't find log 
associated with operation handle"
 Key: SPARK-11043
 URL: https://issues.apache.org/jira/browse/SPARK-11043
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: SaintBacchus


The warnning log is below:
{code:title=Warnning Log|borderStyle=solid}
15/10/09 16:48:23 WARN thrift.ThriftCLIService: Error fetching results: 
org.apache.hive.service.cli.HiveSQLException: Couldn't find log associated with 
operation handle: OperationHandle [opType=EXECUTE_STATEMENT, 
getHandleIdentifier()=fb0900c7-6244-432e-a779-b449ca7f7ca0]
at 
org.apache.hive.service.cli.operation.OperationManager.getOperationLogRowSet(OperationManager.java:229)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:687)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
at 
org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
at com.sun.proxy.$Proxy32.fetchResults(Unknown Source)
at 
org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:454)
at 
org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:672)
at 
org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1553)
at 
org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1538)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at 
org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-11043) Hive Thrift Server will log warn "Couldn't find log associated with operation handle"

2015-10-09 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-11043:
-
Description: 
The warnning log is below:
{code:title=Warnning Log|borderStyle=solid}
15/10/09 16:48:23 WARN thrift.ThriftCLIService: Error fetching results: 
org.apache.hive.service.cli.HiveSQLException: Couldn't find log associated with 
operation handle: OperationHandle [opType=EXECUTE_STATEMENT, 
getHandleIdentifier()=fb0900c7-6244-432e-a779-b449ca7f7ca0]
at 
org.apache.hive.service.cli.operation.OperationManager.getOperationLogRowSet(OperationManager.java:229)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:687)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
at 
org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
at com.sun.proxy.$Proxy32.fetchResults(Unknown Source)
at 
org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:454)
at 
org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:672)
at 
org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1553)
at 
org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1538)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at 
org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
Once I execute a statement, there will have this warnning log by the default 
configuration.

  was:
The warnning log is below:
{code:title=Warnning Log|borderStyle=solid}
15/10/09 16:48:23 WARN thrift.ThriftCLIService: Error fetching results: 
org.apache.hive.service.cli.HiveSQLException: Couldn't find log associated with 
operation handle: OperationHandle [opType=EXECUTE_STATEMENT, 
getHandleIdentifier()=fb0900c7-6244-432e-a779-b449ca7f7ca0]
at 
org.apache.hive.service.cli.operation.OperationManager.getOperationLogRowSet(OperationManager.java:229)
at 
org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:687)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
at 
org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at 
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
at com.sun.proxy.$Proxy32.fetchResults(Unknown Source)
at 
org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:454)
at 
org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:672)
at 
org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1553)
at 
org.apache.hive.service.cli.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1538)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at 

[jira] [Created] (SPARK-11000) Derby have booted the database twice in yarn security mode.

2015-10-08 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-11000:


 Summary: Derby have booted the database twice in yarn security 
mode.
 Key: SPARK-11000
 URL: https://issues.apache.org/jira/browse/SPARK-11000
 Project: Spark
  Issue Type: Bug
  Components: Spark Shell, SQL, YARN
Affects Versions: 1.6.0
Reporter: SaintBacchus


*bin/spark-shell --master yarn-client*
If spark was build with hive, this simple command will also have a problem: 
_Another instance of Derby may have already booted the database_
{code:title=Exeception|borderStyle=solid}
Caused by: java.sql.SQLException: Another instance of Derby may have already 
booted the database /opt/client/Spark/spark/metastore_db.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
 Source)
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
Source)
... 130 more
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
database /opt/client/Spark/spark/metastore_db.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11000) Derby have booted the database twice in yarn security mode.

2015-10-08 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948138#comment-14948138
 ] 

SaintBacchus commented on SPARK-11000:
--

Very similar with it but this is in yarn security mode.

> Derby have booted the database twice in yarn security mode.
> ---
>
> Key: SPARK-11000
> URL: https://issues.apache.org/jira/browse/SPARK-11000
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL, YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
>
> *bin/spark-shell --master yarn-client*
> If spark was build with hive, this simple command will also have a problem: 
> _Another instance of Derby may have already booted the database_
> {code:title=Exeception|borderStyle=solid}
> Caused by: java.sql.SQLException: Another instance of Derby may have already 
> booted the database /opt/client/Spark/spark/metastore_db.
> at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>  Source)
> at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
> Source)
> ... 130 more
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
> database /opt/client/Spark/spark/metastore_db.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-11000) Derby have booted the database twice in yarn security mode.

2015-10-08 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-11000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-11000:
-
Comment: was deleted

(was: Very similar with it but this is in yarn security mode.)

> Derby have booted the database twice in yarn security mode.
> ---
>
> Key: SPARK-11000
> URL: https://issues.apache.org/jira/browse/SPARK-11000
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL, YARN
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
>
> *bin/spark-shell --master yarn-client*
> If spark was build with hive, this simple command will also have a problem: 
> _Another instance of Derby may have already booted the database_
> {code:title=Exeception|borderStyle=solid}
> Caused by: java.sql.SQLException: Another instance of Derby may have already 
> booted the database /opt/client/Spark/spark/metastore_db.
> at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
> at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(Unknown
>  Source)
> at 
> org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown 
> Source)
> at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
> Source)
> ... 130 more
> Caused by: ERROR XSDB6: Another instance of Derby may have already booted the 
> database /opt/client/Spark/spark/metastore_db.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-10473) EventLog will loss message in the long-running security application

2015-09-29 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-10473:
-
Affects Version/s: 1.6.0

> EventLog will loss message in the long-running security application
> ---
>
> Key: SPARK-10473
> URL: https://issues.apache.org/jira/browse/SPARK-10473
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, Web UI
>Affects Versions: 1.5.0, 1.6.0
>Reporter: SaintBacchus
>
> In the implementation of *EventLoggingListener* , there is only one 
> OutputStream writing event message to HDFS.
> But when the token of the *DFSClient* in the "OutputStream" was expired, the  
> "DFSClient" had no right to write the message and miss all the message behind.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-10755) Set the driver also update the token for long-running application

2015-09-25 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-10755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus closed SPARK-10755.

Resolution: Not A Problem

It's not a good idea to do so, since the hadoop rpc can  automatically renew 
the token by the keytab but it can't renew  sucessfully.
This solution is not a root cause of this problem.


> Set the driver also update the token for long-running application
> -
>
> Key: SPARK-10755
> URL: https://issues.apache.org/jira/browse/SPARK-10755
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: SaintBacchus
>
> In the yarn-client mode, driver will write the event logs into hdfs and get 
> the partition information from hdfs, so it's nessary to update the token from 
> the *AMDelegationTokenRenewer*.
> In the yarn-cluster mode, driver is company with AM and token will update by 
> AM. But it's still better to update the token for client process since the 
> client wants to delete the staging dir with a expired token.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10786) SparkSQLCLIDriver should take the whole statement to generate the CommandProcessor

2015-09-23 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-10786:


 Summary: SparkSQLCLIDriver should take the whole statement to 
generate the CommandProcessor
 Key: SPARK-10786
 URL: https://issues.apache.org/jira/browse/SPARK-10786
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.0
Reporter: SaintBacchus
Priority: Minor


In the now implementation of SparkSQLCLIDriver.scala: 
*val proc: CommandProcessor = CommandProcessorFactory.get(Array(tokens(0)), 
hconf)*

*CommandProcessorFactory* only take the first token of the statement, and this 
will be hard to diff the statement *delete jar xxx* and *delete from xxx*.
So maybe it's better to take the whole statement into the 
*CommandProcessorFactory*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10766) Add some configurations for the client process in yarn-cluster mode.

2015-09-22 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-10766:


 Summary: Add some configurations for the client process in 
yarn-cluster mode. 
 Key: SPARK-10766
 URL: https://issues.apache.org/jira/browse/SPARK-10766
 Project: Spark
  Issue Type: Improvement
  Components: YARN
Affects Versions: 1.6.0
Reporter: SaintBacchus


In the yarn-cluster mode, it's hard to find the correct configuration for the 
client process. 
But this is necessary such as the client process's class path: if I want to use 
hbase on spark, I have to include the hbase jars into client's classpath.
But *spark.driver.extraClassPath* can't take effect. The way I can do is set 
the hbase jars into the Enviroment of SPARK_CLASSPATH. 
It isn't a better way so I want to add some configuration for this client 
process.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10755) Set the driver also update the token for long-running application

2015-09-22 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-10755:


 Summary: Set the driver also update the token for long-running 
application
 Key: SPARK-10755
 URL: https://issues.apache.org/jira/browse/SPARK-10755
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.0
Reporter: SaintBacchus


In the yarn-client mode, driver will write the event logs into hdfs and get the 
partition information from hdfs, so it's nessary to update the token from the 
*AMDelegationTokenRenewer*.
In the yarn-cluster mode, driver is company with AM and token will update by 
AM. But it's still better to update the token for client process since the 
client wants to delete the staging dir with a expired token.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-10473) EventLog will loss message in the long-running security application

2015-09-07 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-10473:


 Summary: EventLog will loss message in the long-running security 
application
 Key: SPARK-10473
 URL: https://issues.apache.org/jira/browse/SPARK-10473
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Web UI
Affects Versions: 1.5.0
Reporter: SaintBacchus


In the implementation of *EventLoggingListener* , there is only one 
OutputStream writing event message to HDFS.
But when the token of the *DFSClient* in the "OutputStream" was expired, the  
"DFSClient" had no right to write the message and miss all the message behind.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629483#comment-14629483
 ] 

SaintBacchus commented on SPARK-9091:
-

[~srowen] Sorry for forgetting to change the type and add the description 
immediately, it's a small improvement in the DStream.
I will reopen it after I had the PR.


 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Since the RDD has the function *saveAsTextFile* which can use 
 *CompressionCodec* to compress the data, so it's better to add a similar 
 interface in DStream



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Description: Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream  (was: Add description later.)

 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Since the RDD has the function *saveAsTextFile* which can use 
 *CompressionCodec* to compress the data, so it's better to add a similar 
 interface in DStream



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Description: 
Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream. 
In some IO-bottleneck scenario, it's very useful for user to have this 
interface in DStream.

  was:
Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream. 
In some IO-bottleneck scenario, it's very useful for user to had this interface.


 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Since the RDD has the function *saveAsTextFile* which can use 
 *CompressionCodec* to compress the data, so it's better to add a similar 
 interface in DStream. 
 In some IO-bottleneck scenario, it's very useful for user to have this 
 interface in DStream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Summary: Add the codec interface to Text DStream.  (was: Add the codec 
interface to DStream.)

 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Add description later.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Priority: Minor  (was: Major)

 Add the codec interface to DStream.
 ---

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Add description later.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Issue Type: Improvement  (was: Bug)

 Add the codec interface to DStream.
 ---

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus

 Add description later.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Description: 
Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream. 
In some IO-bottleneck scenario, it's very useful for user to had this interface.

  was:
Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream. 
In some IO-bottleneck scenario, it's very 


 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Since the RDD has the function *saveAsTextFile* which can use 
 *CompressionCodec* to compress the data, so it's better to add a similar 
 interface in DStream. 
 In some IO-bottleneck scenario, it's very useful for user to had this 
 interface.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Description: 
Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream. 
In some IO-bottleneck scenario, it's very 

  was:Since the RDD has the function *saveAsTextFile* which can use 
*CompressionCodec* to compress the data, so it's better to add a similar 
interface in DStream


 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus
Priority: Minor

 Since the RDD has the function *saveAsTextFile* which can use 
 *CompressionCodec* to compress the data, so it's better to add a similar 
 interface in DStream. 
 In some IO-bottleneck scenario, it's very 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-9091) Add the codec interface to Text DStream.

2015-07-16 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629639#comment-14629639
 ] 

SaintBacchus commented on SPARK-9091:
-

[~sowen] I agree user can design the output by DStream.foreachRDD, I purpose it 
for convenience to use.
In my case, I had copy a bit code from Spark to adapt this function and I guess 
others may also have this scenario, so I open this Jira to push it into Spark. 

 Add the codec interface to Text DStream.
 

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: SaintBacchus
Priority: Minor

 Since the RDD has the function *saveAsTextFile* which can use 
 *CompressionCodec* to compress the data, so it's better to add a similar 
 interface in DStream. 
 In some IO-bottleneck scenario, it's very useful for user to have this 
 interface in DStream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-9091) Add the codec interface to DStream.

2015-07-15 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-9091:

Description: Add description later.

 Add the codec interface to DStream.
 ---

 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus

 Add description later.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-9091) Add the codec interface to DStream.

2015-07-15 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-9091:
---

 Summary: Add the codec interface to DStream.
 Key: SPARK-9091
 URL: https://issues.apache.org/jira/browse/SPARK-9091
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8839) Thrift Sever will throw `java.util.NoSuchElementException: key not found` exception when many clients connect it

2015-07-06 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8839:
---

 Summary: Thrift Sever will throw 
`java.util.NoSuchElementException: key not found` exception when  many clients 
connect it
 Key: SPARK-8839
 URL: https://issues.apache.org/jira/browse/SPARK-8839
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.5.0
Reporter: SaintBacchus


If there are about 150+ JDBC clients connectting to the Thrift Server,  some 
clients will throw such exception:
{code:title=Exception message|borderStyle=solid}
java.sql.SQLException: java.util.NoSuchElementException: key not found: 
90d93e56-7f6d-45bf-b340-e3ee09dd60fc
 at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:155)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8820) Add a configuration to set the checkpoint directory for convenience.

2015-07-03 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8820:
---

 Summary: Add a configuration to set the checkpoint directory for 
convenience.
 Key: SPARK-8820
 URL: https://issues.apache.org/jira/browse/SPARK-8820
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus


Add a configuration named *spark.streaming.checkpointDir*  to set the 
checkpoint directory.
 It will overwrite by user if they also call *StreamingContext#checkpoint*.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8755) Streaming application from checkpoint will fail to load in security mode.

2015-07-01 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8755:

Description: 
If the user set *spark.yarn.principal* and *spark.yarn.keytab* , he does not 
need *kinit* in the client machine.
But when the application was recorved from checkpoint file, it had to *kinit*, 
because:
The checkpoint did not use this configurations before it use a DFSClient to 
fetch the ckeckpoint file.

  was:
If the user set *spark.yarn.principal* and *spark.yarn.keytab* , he does not 
need *kinit* in the client machine.
But the application was recorved from checkpoint file, it had to *kinit*, 
because:
 the checkpoint did not use this configurations before it use a DFSClient to 
fetch the ckeckpoint file.


 Streaming application from checkpoint will fail to load in security mode.
 -

 Key: SPARK-8755
 URL: https://issues.apache.org/jira/browse/SPARK-8755
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus

 If the user set *spark.yarn.principal* and *spark.yarn.keytab* , he does not 
 need *kinit* in the client machine.
 But when the application was recorved from checkpoint file, it had to 
 *kinit*, because:
 The checkpoint did not use this configurations before it use a DFSClient to 
 fetch the ckeckpoint file.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8755) Streaming application from checkpoint will fail to load in security mode.

2015-07-01 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8755:
---

 Summary: Streaming application from checkpoint will fail to load 
in security mode.
 Key: SPARK-8755
 URL: https://issues.apache.org/jira/browse/SPARK-8755
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus


If the user set *spark.yarn.principal* and *spark.yarn.keytab* , he does not 
need *kinit* in the client machine.
But the application was recorved from checkpoint file, it had to *kinit*, 
because:
 the checkpoint did not use this configurations before it use a DFSClient to 
fetch the ckeckpoint file.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8687) Spark on yarn-client mode can't send `spark.yarn.credentials.file` to executor.

2015-06-29 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8687:

Summary: Spark on yarn-client mode can't send `spark.yarn.credentials.file` 
to executor.  (was: Spark on yarn can't send `spark.yarn.credentials.file` to 
executor.)

 Spark on yarn-client mode can't send `spark.yarn.credentials.file` to 
 executor.
 ---

 Key: SPARK-8687
 URL: https://issues.apache.org/jira/browse/SPARK-8687
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: SaintBacchus

 Yarn will set +spark.yarn.credentials.file+ after *DriverEndpoint* 
 initialized. So executor will fetch the old configuration and will cause the 
 problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8688) Hadoop Configuration has to disable client cache when writing or reading delegation tokens.

2015-06-28 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8688:
---

 Summary: Hadoop Configuration has to disable client cache when 
writing or reading delegation tokens.
 Key: SPARK-8688
 URL: https://issues.apache.org/jira/browse/SPARK-8688
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: SaintBacchus


In class *AMDelegationTokenRenewer* and *ExecutorDelegationTokenUpdater*, Spark 
will write and read the credentials.
But if we don't disable the *fs.hdfs.impl.disable.cache*, Spark will use cached 
 FileSystem (which will use old token ) to  upload or download file.
Then when the old token is expired, it can't gain the auth to get/put the hdfs.

(I only tested in a very short time with the configuration:
dfs.namenode.delegation.token.renew-interval=3min
dfs.namenode.delegation.token.max-lifetime=10min
I'm not sure whatever it matters.
 )



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8687) Spark on yarn can't send `spark.yarn.credentials.file` to executor.

2015-06-28 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8687:
---

 Summary: Spark on yarn can't send `spark.yarn.credentials.file` to 
executor.
 Key: SPARK-8687
 URL: https://issues.apache.org/jira/browse/SPARK-8687
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: SaintBacchus


Yarn will set +spark.yarn.credentials.file+ after *DriverEndpoint* initialized. 
So executor will fetch the old configuration and will cause the problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-8163) CheckPoint mechanism did not work well when error happened in big streaming

2015-06-25 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus closed SPARK-8163.
---
Resolution: Not A Problem

Mistake the prolem.

 CheckPoint mechanism did not work well when error happened in big streaming
 ---

 Key: SPARK-8163
 URL: https://issues.apache.org/jira/browse/SPARK-8163
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 I tested it with Kafka DStream.
 Sometimes Kafka Producer had push a lot data to the Kafka Brokers, then 
 Streaming Receiver wanted to pull this data without rate limite.
 At this first batch, Streaming may take 10 or more seconds to comsume this 
 data(batch was 2 second).
 I wanted to describle what the Streaming do more detail at this moment:
 The SC was doing its job; the JobGenerator was still send new batchs to 
 StreamingContext and StreamingContext writed this to the CheckPoint files;And 
 the Receiver still was busy receiving the data from kafka and also tracked 
 this events into CheckPoint.
 Then an error(unexcept error) occured, leading to shutdown the Streaming 
 Application.
 Then we wanted to recover the application from check point files.But since 
 the StreamingContext had record the next few batch, it would be recorvered 
 from the last batch. So the Streaming had already missed the first batch and 
 did not know what data had been actually comsumed by Receiver.
 Setting spark.streaming.concurrentJobs=2 could avoid this problem, but some 
 application can not do this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8619) Can't find the keytab file when recovering the streaming application.

2015-06-24 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8619:
---

 Summary: Can't find the keytab file when recovering the streaming 
application.
 Key: SPARK-8619
 URL: https://issues.apache.org/jira/browse/SPARK-8619
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.0
Reporter: SaintBacchus


In a streaming application, I use *--keytab /root/spark.keytab* to get the 
token.
But when the streaming application failed and I wanted to recover it from 
checkpoint file, there was an error:
{quote}
java.io.IOException: Login failure for spark/hadoop.hadoop@hadoop.com from 
keytab spark.keytab-1fd8f7bb-0d3c-4f65-990a-9ae09055cc8d: 
javax.security.auth.login.LoginException: Unable to obtain password from user
{quote}

Spark had changed the configuration, so the checkpoint can't find the file:
{code:title=Client.java @ Function: setupCredentials |borderStyle=solid}
  val keytabFileName = f.getName + - + UUID.randomUUID().toString
  UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
  loginFromKeytab = true
  sparkConf.set(spark.yarn.keytab, keytabFileName)
{code}

So when recovering the application, we should ignore this configurations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0

2015-06-14 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8367:

Description: 
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will put current data into later 
*offset*.
So when exception occures, the *offset* had commit but the data will loss 

  was:

{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will put current data into previous 
*offset*


 ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
 ---

 Key: SPARK-8367
 URL: https://issues.apache.org/jira/browse/SPARK-8367
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 {code:title=BlockGenerator.scala|borderStyle=solid}
   /** Change the buffer to which single records are added to. */
   private def updateCurrentBuffer(time: Long): Unit = synchronized {
 try {
   val newBlockBuffer = currentBuffer
   currentBuffer = new ArrayBuffer[Any]
   if (newBlockBuffer.size  0) {
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
 val newBlock = new Block(blockId, newBlockBuffer)
 listener.onGenerateBlock(blockId)
 blocksForPushing.put(newBlock)  // put is blocking when queue is full
 logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
   }
 } catch {
   case ie: InterruptedException =
 logInfo(Block updating timer thread was interrupted)
   case e: Exception =
 reportError(Error in block updating thread, e)
 }
   }
 {code}
 If *spark.streaming.blockInterval* was 0, the *blockId* in the code will 
 always be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.
 {code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
 // Get a snapshot of current offset map and store with related block id.
 val offsetSnapshot = topicPartitionOffsetMap.toMap
 blockOffsetMap.put(blockId, offsetSnapshot)
 topicPartitionOffsetMap.clear()
   }

[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0

2015-06-14 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8367:

Description: 
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will commit the  *offset*  before the 
really data comsumed.
So when exception occures, the *offset* had commit but the data will loss since 
the data was in memory and not comsumed yet.

  was:
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will put current data into later 
*offset*.
So when exception occures, the *offset* had commit but the data will loss since 
the data was in memory and not comsumed yet.


 ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
 ---

 Key: SPARK-8367
 URL: https://issues.apache.org/jira/browse/SPARK-8367
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 {code:title=BlockGenerator.scala|borderStyle=solid}
   /** Change the buffer to which single records are added to. */
   private def updateCurrentBuffer(time: Long): Unit = synchronized {
 try {
   val newBlockBuffer = currentBuffer
   currentBuffer = new ArrayBuffer[Any]
   if (newBlockBuffer.size  0) {
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
 val newBlock = new Block(blockId, newBlockBuffer)
 listener.onGenerateBlock(blockId)
 blocksForPushing.put(newBlock)  // put is blocking when queue is full
 logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
   }
 } catch {
   case ie: InterruptedException =
 logInfo(Block updating timer thread was interrupted)
   case e: Exception =
 reportError(Error in block updating thread, e)
 }
   }
 {code}
 If *spark.streaming.blockInterval* was 0, the *blockId* in the code will 
 always be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.
 {code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
 // Get a snapshot of current 

[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0

2015-06-14 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8367:

Description: 
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will put current data into later 
*offset*.
So when exception occures, the *offset* had commit but the data will loss since 
the data was in memory and not comsumed yet.

  was:
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will put current data into later 
*offset*.
So when exception occures, the *offset* had commit but the data will loss 


 ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
 ---

 Key: SPARK-8367
 URL: https://issues.apache.org/jira/browse/SPARK-8367
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 {code:title=BlockGenerator.scala|borderStyle=solid}
   /** Change the buffer to which single records are added to. */
   private def updateCurrentBuffer(time: Long): Unit = synchronized {
 try {
   val newBlockBuffer = currentBuffer
   currentBuffer = new ArrayBuffer[Any]
   if (newBlockBuffer.size  0) {
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
 val newBlock = new Block(blockId, newBlockBuffer)
 listener.onGenerateBlock(blockId)
 blocksForPushing.put(newBlock)  // put is blocking when queue is full
 logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
   }
 } catch {
   case ie: InterruptedException =
 logInfo(Block updating timer thread was interrupted)
   case e: Exception =
 reportError(Error in block updating thread, e)
 }
   }
 {code}
 If *spark.streaming.blockInterval* was 0, the *blockId* in the code will 
 always be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.
 {code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
 // Get a snapshot of current offset map and store with related block id.
 val offsetSnapshot 

[jira] [Updated] (SPARK-8367) ReliableKafka will loss data when `spark.streaming.blockInterval` was 0

2015-06-14 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8367:

Description: 
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will commit the  *offset*  before the 
really data comsumed(data was waitting to be commit but the offset had updated 
and commit by previous commit)
So when exception occures, the *offset* had commit but the data will loss since 
the data was in memory and not comsumed yet.

  was:
{code:title=BlockGenerator.scala|borderStyle=solid}
  /** Change the buffer to which single records are added to. */
  private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
  val newBlockBuffer = currentBuffer
  currentBuffer = new ArrayBuffer[Any]
  if (newBlockBuffer.size  0) {

   val blockId = StreamBlockId(receiverId, time - blockIntervalMs)

val newBlock = new Block(blockId, newBlockBuffer)
listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock)  // put is blocking when queue is full
logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
  }
} catch {
  case ie: InterruptedException =
logInfo(Block updating timer thread was interrupted)
  case e: Exception =
reportError(Error in block updating thread, e)
}
  }
{code}

If *spark.streaming.blockInterval* was 0, the *blockId* in the code will always 
be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.

{code:title=ReliableKafkaReceiver.scala|borderStyle=solid}
   private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
  }
{code}
If the *blockId* was the same,  Streaming will commit the  *offset*  before the 
really data comsumed.
So when exception occures, the *offset* had commit but the data will loss since 
the data was in memory and not comsumed yet.


 ReliableKafka will loss data when `spark.streaming.blockInterval` was 0
 ---

 Key: SPARK-8367
 URL: https://issues.apache.org/jira/browse/SPARK-8367
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 {code:title=BlockGenerator.scala|borderStyle=solid}
   /** Change the buffer to which single records are added to. */
   private def updateCurrentBuffer(time: Long): Unit = synchronized {
 try {
   val newBlockBuffer = currentBuffer
   currentBuffer = new ArrayBuffer[Any]
   if (newBlockBuffer.size  0) {
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
 val newBlock = new Block(blockId, newBlockBuffer)
 listener.onGenerateBlock(blockId)
 blocksForPushing.put(newBlock)  // put is blocking when queue is full
 logDebug(Last element in  + blockId +  is  + newBlockBuffer.last)
   }
 } catch {
   case ie: InterruptedException =
 logInfo(Block updating timer thread was interrupted)
   case e: Exception =
 reportError(Error in block updating thread, e)
 }
   }
 {code}
 If *spark.streaming.blockInterval* was 0, the *blockId* in the code will 
 always be the same because of  *time* was 0 and *blockIntervalMs* was 0 too.
 

[jira] [Reopened] (SPARK-8163) CheckPoint mechanism did not work well when error happened in big streaming

2015-06-08 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus reopened SPARK-8163:
-

 CheckPoint mechanism did not work well when error happened in big streaming
 ---

 Key: SPARK-8163
 URL: https://issues.apache.org/jira/browse/SPARK-8163
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 I tested it with Kafka DStream.
 Sometimes Kafka Producer had push a lot data to the Kafka Brokers, then 
 Streaming Receiver wanted to pull this data without rate limite.
 At this first batch, Streaming may take 10 or more seconds to comsume this 
 data(batch was 2 second).
 I wanted to describle what the Streaming do more detail at this moment:
 The SC was doing its job; the JobGenerator was still send new batchs to 
 StreamingContext and StreamingContext writed this to the CheckPoint files;And 
 the Receiver still was busy receiving the data from kafka and also tracked 
 this events into CheckPoint.
 Then an error(unexcept error) occured, leading to shutdown the Streaming 
 Application.
 Then we wanted to recover the application from check point files.But since 
 the StreamingContext had record the next few batch, it would be recorvered 
 from the last batch. So the Streaming had already missed the first batch and 
 did not know what data had been actually comsumed by Receiver.
 Setting spark.streaming.concurrentJobs=2 could avoid this problem, but some 
 application can not do this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-8163) CheckPoint mechanism did not work well when error happened in big streaming

2015-06-08 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-8163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14578161#comment-14578161
 ] 

SaintBacchus commented on SPARK-8163:
-

Hi [~sowen] all the description was the problem how I meet it.
Since my poor English, I think you may not understand what i say:
First, Producer had push a lot data to the Kafka Brokers
Second, after a while(about 10s) shutdown the streaming
Third, recover it from checkpoint file

The result is that Streaming skipped many batches.

I really think this is a big problem, so I still reopen this issue.

 CheckPoint mechanism did not work well when error happened in big streaming
 ---

 Key: SPARK-8163
 URL: https://issues.apache.org/jira/browse/SPARK-8163
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 I tested it with Kafka DStream.
 Sometimes Kafka Producer had push a lot data to the Kafka Brokers, then 
 Streaming Receiver wanted to pull this data without rate limite.
 At this first batch, Streaming may take 10 or more seconds to comsume this 
 data(batch was 2 second).
 I wanted to describle what the Streaming do more detail at this moment:
 The SC was doing its job; the JobGenerator was still send new batchs to 
 StreamingContext and StreamingContext writed this to the CheckPoint files;And 
 the Receiver still was busy receiving the data from kafka and also tracked 
 this events into CheckPoint.
 Then an error(unexcept error) occured, leading to shutdown the Streaming 
 Application.
 Then we wanted to recover the application from check point files.But since 
 the StreamingContext had record the next few batch, it would be recorvered 
 from the last batch. So the Streaming had already missed the first batch and 
 did not know what data had been actually comsumed by Receiver.
 Setting spark.streaming.concurrentJobs=2 could avoid this problem, but some 
 application can not do this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8163) CheckPoint mechanism did not work well when error happened in big streaming

2015-06-08 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8163:
---

 Summary: CheckPoint mechanism did not work well when error 
happened in big streaming
 Key: SPARK-8163
 URL: https://issues.apache.org/jira/browse/SPARK-8163
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus
 Fix For: 1.5.0


I tested it with Kafka DStream.
Sometimes Kafka Producer had push a lot data to the Kafka Brokers, then 
Streaming Receiver wanted to pull this data without rate limite.
At this first batch, Streaming may take 10 or more seconds to comsume this 
data(batch was 2 second).
I wanted to describle what the Streaming do more detail at this moment:
The SC was doing its job; the JobGenerator was still send new batchs to 
StreamingContext and StreamingContext writed this to the CheckPoint files;And 
the Receiver still was busy receiving the data from kafka and also tracked this 
events into CheckPoint.
Then an error(unexcept error) occured, leading to shutdown the Streaming 
Application.
Then we wanted to recover the application from check point files.But since the 
StreamingContext had record the next few batch, it would be recorvered from the 
last batch. So the Streaming had already missed the first batch and did not 
know what data had been actually comsumed by Receiver.
Setting spark.streaming.concurrentJobs=2 could avoid this problem, but some 
application can not do this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-8119) Spark will set total executor when some executors fail.

2015-06-04 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-8119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-8119:

Description: 
DynamicAllocation will set the total executor to a little number when it wants 
to kill some executors.
But in no-DynamicAllocation scenario, Spark will also set the total executor.
So it will cause such problem: sometimes an executor fails down, there is no 
more executor which will be pull up by spark.

  was:
DynamicAllocation will set the total executor to a little number when it wants 
to kill some executors.
But in no-DynamicAllocation scenario, Spark will also set the total executor. 
So it will cause thus problem: sometimes an executor fails down, there is no 
more executor which will be pull up by spark.


 Spark will set total executor when some executors fail.
 ---

 Key: SPARK-8119
 URL: https://issues.apache.org/jira/browse/SPARK-8119
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 1.4.0
Reporter: SaintBacchus
 Fix For: 1.4.0


 DynamicAllocation will set the total executor to a little number when it 
 wants to kill some executors.
 But in no-DynamicAllocation scenario, Spark will also set the total executor.
 So it will cause such problem: sometimes an executor fails down, there is no 
 more executor which will be pull up by spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-8119) Spark will set total executor when some executors fail.

2015-06-04 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-8119:
---

 Summary: Spark will set total executor when some executors fail.
 Key: SPARK-8119
 URL: https://issues.apache.org/jira/browse/SPARK-8119
 Project: Spark
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 1.4.0
Reporter: SaintBacchus
 Fix For: 1.4.0


DynamicAllocation will set the total executor to a little number when it wants 
to kill some executors.
But in no-DynamicAllocation scenario, Spark will also set the total executor. 
So it will cause thus problem: sometimes an executor fails down, there is no 
more executor which will be pull up by spark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7942) Receiver's life cycle is inconsistent with streaming job.

2015-06-03 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14572212#comment-14572212
 ] 

SaintBacchus commented on SPARK-7942:
-

[~tdas] Now I'm not clear about how to deal with only some of the receivers had 
broken down, should we shutdown the StreamingContext ? Or ignore this and leave 
the alive receivers still running

 Receiver's life cycle is inconsistent with streaming job.
 -

 Key: SPARK-7942
 URL: https://issues.apache.org/jira/browse/SPARK-7942
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 Streaming consider the receiver as a common spark job, thus if an error 
 occurs in the receiver's  logical(after 4 times(default) retries ), streaming 
 will no longer get any data but the streaming job is still running. 
 A general scenario is that: we config the 
 `spark.streaming.receiver.writeAheadLog.enable` as true to use the 
 `ReliableKafkaReceiver` but do not set the checkpoint dir. Then the receiver 
 will soon be shut down but the streaming is alive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-7942) Receiver's life cycle is inconsistent with streaming job.

2015-05-29 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-7942:
---

 Summary: Receiver's life cycle is inconsistent with streaming job.
 Key: SPARK-7942
 URL: https://issues.apache.org/jira/browse/SPARK-7942
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus


Streaming consider the receiver as a common spark job, thus if an error occurs 
in the receiver's  logical(after 4 times(default) retries ), streaming will no 
longer get any data but the streaming job is still running. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7942) Receiver's life cycle is inconsistent with streaming job.

2015-05-29 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14564411#comment-14564411
 ] 

SaintBacchus commented on SPARK-7942:
-

[~tdas] Should we add a logic to shut down the StreamingContext if the Receiver 
had been down?

 Receiver's life cycle is inconsistent with streaming job.
 -

 Key: SPARK-7942
 URL: https://issues.apache.org/jira/browse/SPARK-7942
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 Streaming consider the receiver as a common spark job, thus if an error 
 occurs in the receiver's  logical(after 4 times(default) retries ), streaming 
 will no longer get any data but the streaming job is still running. 
 A general scenario is that: we config the 
 `spark.streaming.receiver.writeAheadLog.enable` as true to use the 
 `ReliableKafkaReceiver` but do not set the checkpoint dir. Then the receiver 
 will soon be shut down but the streaming is alive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-7942) Receiver's life cycle is inconsistent with streaming job.

2015-05-29 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-7942:

Description: 
Streaming consider the receiver as a common spark job, thus if an error occurs 
in the receiver's  logical(after 4 times(default) retries ), streaming will no 
longer get any data but the streaming job is still running. 
A general scenario is that: we config the 
`spark.streaming.receiver.writeAheadLog.enable` as true to use the 
`ReliableKafkaReceiver` but do not set the checkpoint dir. Then the receiver 
will soon be shut down but the streaming is alive.

  was:Streaming consider the receiver as a common spark job, thus if an error 
occurs in the receiver's  logical(after 4 times(default) retries ), streaming 
will no longer get any data but the streaming job is still running. 


 Receiver's life cycle is inconsistent with streaming job.
 -

 Key: SPARK-7942
 URL: https://issues.apache.org/jira/browse/SPARK-7942
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
Reporter: SaintBacchus

 Streaming consider the receiver as a common spark job, thus if an error 
 occurs in the receiver's  logical(after 4 times(default) retries ), streaming 
 will no longer get any data but the streaming job is still running. 
 A general scenario is that: we config the 
 `spark.streaming.receiver.writeAheadLog.enable` as true to use the 
 `ReliableKafkaReceiver` but do not set the checkpoint dir. Then the receiver 
 will soon be shut down but the streaming is alive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6955) Do not let Yarn Shuffle Server retry its server port.

2015-04-15 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-6955:
---

 Summary: Do not let Yarn Shuffle Server retry its server port.
 Key: SPARK-6955
 URL: https://issues.apache.org/jira/browse/SPARK-6955
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, YARN
Affects Versions: 1.4.0
Reporter: SaintBacchus


 It's better to let the NodeManager get down rather than take a port retry when 
`spark.shuffle.service.port` has been conflicted during starting the Spark Yarn 
Shuffle Server, because the retry mechanism will make the inconsistency of 
shuffle port and also make client fail to find the port.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6605) Same transformation in DStream leads to different result

2015-03-30 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386324#comment-14386324
 ] 

SaintBacchus commented on SPARK-6605:
-

Hi, [~tdas] can you have a look at this problem? Is this acceptable for 
streaming users?

 Same transformation in DStream leads to different result
 

 Key: SPARK-6605
 URL: https://issues.apache.org/jira/browse/SPARK-6605
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: SaintBacchus
 Fix For: 1.4.0


 The transformation *reduceByKeyAndWindow* has two implementations: one use 
 the *WindowDstream* and the other use *ReducedWindowedDStream*.
 But the result always is the same, except when an empty windows occurs.
 As a wordcount example, if a period of time (larger than window time) has no 
 data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
 second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6605) Same transformation in DStream leads to different result

2015-03-30 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-6605:
---

 Summary: Same transformation in DStream leads to different result
 Key: SPARK-6605
 URL: https://issues.apache.org/jira/browse/SPARK-6605
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: SaintBacchus
 Fix For: 1.4.0


The transformation *reduceByKeyAndWindow* has two implementations: one use the 
*WindowDstream* and the other use *ReducedWindowedDStream*.
But the result always is the same, except when an empty windows occurs.
As a wordcount example, if a period of time (larger than window time) has no 
data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6605) Same transformation in DStream leads to different result

2015-03-30 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386610#comment-14386610
 ] 

SaintBacchus commented on SPARK-6605:
-

Yeah, [~srowen] it's not a wrong answer but just a little different from what 
we expect. It's caused by two different implementations.
But I doubt whether we should fix it as the first case or let users deal with 
the empty result using *filter*.
If we want to fix it, setting the {{invFunc}} as {{(V,V) = Option\[V\]}} is a 
good idea or add a {{Filter Function}} is also OK for simple.

 Same transformation in DStream leads to different result
 

 Key: SPARK-6605
 URL: https://issues.apache.org/jira/browse/SPARK-6605
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: SaintBacchus
 Fix For: 1.4.0


 The transformation *reduceByKeyAndWindow* has two implementations: one use 
 the *WindowDstream* and the other use *ReducedWindowedDStream*.
 But the result always is the same, except when an empty windows occurs.
 As a wordcount example, if a period of time (larger than window time) has no 
 data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
 second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6605) Same transformation in DStream leads to different result

2015-03-30 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14386540#comment-14386540
 ] 

SaintBacchus commented on SPARK-6605:
-

Hi [~srowen], my test code is this :
{code:title=test.scala|borderStyle=solid}
val words = ssc.socketTextStream(sIp , sPort).flatMap(_.split( 
)).map(x = (x , 1))
val resultWindow3 = words.reduceByKeyAndWindow((a:Int,b:Int) = (a + 
b), Seconds(winDur), Seconds(slideDur) );
val resultWindow4 = words.reduceByKeyAndWindow(_ + _, _ - _, 
Seconds(winDur), Seconds(slideDur) );
{code}
This *resultWindow3* is implemented by
{code:title=PairDStreamFunctions.scala|borderStyle=solid}
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
self.reduceByKey(cleanedReduceFunc, partitioner)
.window(windowDuration, slideDuration)
.reduceByKey(cleanedReduceFunc, partitioner)
{code}
And *resultWindow4* is implemented by
{code:title=PairDStreamFunctions.scala|borderStyle=solid}
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
val cleanedFilterFunc = if (filterFunc != null) 
Some(ssc.sc.clean(filterFunc)) else None
new ReducedWindowedDStream[K, V](
  self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
  windowDuration, slideDuration, partitioner
)
{code}
The result of this test code is:
{quote}
= resultWindow3 is:
= resultWindow4 is:
(hello,0)
(world,0)
{quote}
*resultWindow3* is empty but *resultWindow4* has two elements whose keys were 
received before.


 Same transformation in DStream leads to different result
 

 Key: SPARK-6605
 URL: https://issues.apache.org/jira/browse/SPARK-6605
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: SaintBacchus
 Fix For: 1.4.0


 The transformation *reduceByKeyAndWindow* has two implementations: one use 
 the *WindowDstream* and the other use *ReducedWindowedDStream*.
 But the result always is the same, except when an empty windows occurs.
 As a wordcount example, if a period of time (larger than window time) has no 
 data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
 second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6605) Same transformation in DStream leads to different result

2015-03-30 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14387811#comment-14387811
 ] 

SaintBacchus edited comment on SPARK-6605 at 3/31/15 1:54 AM:
--

{{reduceByKeyAndWindow}} has two implementations and leads to two different 
result when coming an empty window.
But we consider it as a difference not a problem. If user wants to remove the 
empty keys using {{ReducedWindowedDStream}}, he can have a {{filter}} function 
to remove it.


was (Author: carlmartin):
{{reduceByKeyAndWindow }} has two implementations and leads to two different 
result when coming an empty window.
But we consider it as a difference not a problem. If user wants to remove the 
empty keys using {{ReducedWindowedDStream}}, he can have a {{filter}} function 
to remove it.

 Same transformation in DStream leads to different result
 

 Key: SPARK-6605
 URL: https://issues.apache.org/jira/browse/SPARK-6605
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: SaintBacchus
 Fix For: 1.4.0


 The transformation *reduceByKeyAndWindow* has two implementations: one use 
 the *WindowDstream* and the other use *ReducedWindowedDStream*.
 But the result always is the same, except when an empty windows occurs.
 As a wordcount example, if a period of time (larger than window time) has no 
 data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
 second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-6605) Same transformation in DStream leads to different result

2015-03-30 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus resolved SPARK-6605.
-
Resolution: Won't Fix

{{reduceByKeyAndWindow }} has two implementations and leads to two different 
result when coming an empty window.
But we consider it as a difference not a problem. If user wants to remove the 
empty keys using {{ReducedWindowedDStream}}, he can have a {{filter}} function 
to remove it.

 Same transformation in DStream leads to different result
 

 Key: SPARK-6605
 URL: https://issues.apache.org/jira/browse/SPARK-6605
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: SaintBacchus
 Fix For: 1.4.0


 The transformation *reduceByKeyAndWindow* has two implementations: one use 
 the *WindowDstream* and the other use *ReducedWindowedDStream*.
 But the result always is the same, except when an empty windows occurs.
 As a wordcount example, if a period of time (larger than window time) has no 
 data coming, the first *reduceByKeyAndWindow*  has no elem inside but the 
 second has many elem with the zero value inside.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6582) Support ssl for this AvroSink in Spark Streaming External

2015-03-28 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-6582:
---

 Summary: Support ssl for this AvroSink in Spark Streaming External
 Key: SPARK-6582
 URL: https://issues.apache.org/jira/browse/SPARK-6582
 Project: Spark
  Issue Type: Improvement
Reporter: SaintBacchus
 Fix For: 1.4.0


AvroSink had already support the *ssl*,  so it's better to support *ssl* in the 
Spark Streaming External Flume. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6584) Provide ExecutorPrefixTaskLocation to support the rdd which can be aware of partition's executor location.

2015-03-28 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-6584:
---

 Summary: Provide ExecutorPrefixTaskLocation to support the rdd 
which can be aware of partition's executor  location.
 Key: SPARK-6584
 URL: https://issues.apache.org/jira/browse/SPARK-6584
 Project: Spark
  Issue Type: Sub-task
Affects Versions: 1.4.0
Reporter: SaintBacchus


The function *RDD.getPreferredLocations* can only be set the host awareness 
prefer locations.
If some *RDD* wants to be scheduled by executor(such as BlockRDD), spark can do 
nothing for this.
So  I want to provide *ExecutorPrefixTaskLocation* to support the rdd which can 
be aware of partition's executor location. This mechanism can avoid data 
transfor in the case of many executor in the same host.
I think it's very useful especially for *SparkStreaming* since the *Receriver* 
save data into the *BlockManger* and then become a BlockRDD



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6464) Add a new transformation of rdd named processCoalesce which was particularly to deal with the small and cached rdd

2015-03-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6464:

Affects Version/s: (was: 1.3.0)
   1.4.0

 Add a new transformation of rdd named processCoalesce which was  particularly 
 to deal with the small and cached rdd
 ---

 Key: SPARK-6464
 URL: https://issues.apache.org/jira/browse/SPARK-6464
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.4.0
Reporter: SaintBacchus
 Attachments: screenshot-1.png


 Nowadays, the transformation *coalesce* was always used to expand or reduce 
 the number of the partition in order to gain a good performance.
 But *coalesce* can't make sure that the child partition will be executed in 
 the same executor as the parent partition. And this will lead to have a large 
 network transfer.
 In some scenario such as I mentioned in the title +small and cached rdd+, we 
 want to coalesce all the partition in the same executor into one partition 
 and make sure the child partition will be executed in this executor. It can 
 avoid network transfer and reduce the scheduler of the Tasks and also can 
 reused the cpu core to do other job. 
 In this scenario, our performance had improved 20% than before.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6464) Add a new transformation of rdd named processCoalesce which was particularly to deal with the small and cached rdd

2015-03-23 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6464:

Description: 
Nowadays, the transformation *coalesce* was always used to expand or reduce the 
number of the partition in order to gain a good performance.
But *coalesce* can't make sure that the child partition will be executed in the 
same executor as the parent partition. And this will lead to have a large 
network transfer.
In some scenario such as I mentioned in the title +small and cached rdd+, we 
want to coalesce all the partition in the same executor into one partition and 
make sure the child partition will be executed in this executor. It can avoid 
network transfer and reduce the scheduler of the Tasks and also can reused the 
cpu core to do other job. 
In this scenario, our performance had improved 20% than before.

  was:
Nowadays, the transformation *coalesce* was always used to expand or reduce the 
number of the partition in order to gain a good performance.
But *coalesce* can't make sure that the child partition will be executed in the 
same executor as the parent partition. And this will lead to have a large 
network transfer.
In some scenario such as I metioned in the title 


 Add a new transformation of rdd named processCoalesce which was  particularly 
 to deal with the small and cached rdd
 ---

 Key: SPARK-6464
 URL: https://issues.apache.org/jira/browse/SPARK-6464
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: SaintBacchus

 Nowadays, the transformation *coalesce* was always used to expand or reduce 
 the number of the partition in order to gain a good performance.
 But *coalesce* can't make sure that the child partition will be executed in 
 the same executor as the parent partition. And this will lead to have a large 
 network transfer.
 In some scenario such as I mentioned in the title +small and cached rdd+, we 
 want to coalesce all the partition in the same executor into one partition 
 and make sure the child partition will be executed in this executor. It can 
 avoid network transfer and reduce the scheduler of the Tasks and also can 
 reused the cpu core to do other job. 
 In this scenario, our performance had improved 20% than before.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6464) Add a new transformation of rdd named processCoalesce which was particularly to deal with the small and cached rdd

2015-03-23 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6464:

Description: 
Nowadays, the transformation *coalesce* was always used to expand or reduce the 
number of the partition in order to gain a good performance.
But *coalesce* can't make sure that the child partition will be executed in the 
same executor as the parent partition. And this will lead to have a large 
network transfer.
In some scenario such as I metioned in the title 

 Add a new transformation of rdd named processCoalesce which was  particularly 
 to deal with the small and cached rdd
 ---

 Key: SPARK-6464
 URL: https://issues.apache.org/jira/browse/SPARK-6464
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: SaintBacchus

 Nowadays, the transformation *coalesce* was always used to expand or reduce 
 the number of the partition in order to gain a good performance.
 But *coalesce* can't make sure that the child partition will be executed in 
 the same executor as the parent partition. And this will lead to have a large 
 network transfer.
 In some scenario such as I metioned in the title 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6464) Add a new transformation of rdd named processCoalesce which was particularly to deal with the small and cached rdd

2015-03-23 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-6464:
---

 Summary: Add a new transformation of rdd named processCoalesce 
which was  particularly to deal with the small and cached rdd
 Key: SPARK-6464
 URL: https://issues.apache.org/jira/browse/SPARK-6464
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: SaintBacchus






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6464) Add a new transformation of rdd named processCoalesce which was particularly to deal with the small and cached rdd

2015-03-23 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6464:

Description: 
Nowadays, the transformation *coalesce* was always used to expand or reduce the 
number of the partition in order to gain a good performance.
But *coalesce* can't make sure that the child partition will be executed in the 
same executor as the parent partition. And this will lead to have a large 
network transfer.
In some scenario such as I mentioned in the title +small and cached rdd+, we 
want to coalesce all the partition in the same executor into one partition and 
make sure the child partition will be executed in this executor. It can avoid 
network transfer and reduce the scheduler of the Tasks and also can reused the 
cpu core to do other job. 
In this scenario, our performance had improved 20% than before.


  was:
Nowadays, the transformation *coalesce* was always used to expand or reduce the 
number of the partition in order to gain a good performance.
But *coalesce* can't make sure that the child partition will be executed in the 
same executor as the parent partition. And this will lead to have a large 
network transfer.
In some scenario such as I mentioned in the title +small and cached rdd+, we 
want to coalesce all the partition in the same executor into one partition and 
make sure the child partition will be executed in this executor. It can avoid 
network transfer and reduce the scheduler of the Tasks and also can reused the 
cpu core to do other job. 
In this scenario, our performance had improved 20% than before.


 Add a new transformation of rdd named processCoalesce which was  particularly 
 to deal with the small and cached rdd
 ---

 Key: SPARK-6464
 URL: https://issues.apache.org/jira/browse/SPARK-6464
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.3.0
Reporter: SaintBacchus

 Nowadays, the transformation *coalesce* was always used to expand or reduce 
 the number of the partition in order to gain a good performance.
 But *coalesce* can't make sure that the child partition will be executed in 
 the same executor as the parent partition. And this will lead to have a large 
 network transfer.
 In some scenario such as I mentioned in the title +small and cached rdd+, we 
 want to coalesce all the partition in the same executor into one partition 
 and make sure the child partition will be executed in this executor. It can 
 avoid network transfer and reduce the scheduler of the Tasks and also can 
 reused the cpu core to do other job. 
 In this scenario, our performance had improved 20% than before.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```test.scala
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
```
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```test.scala
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
println(s[${Thread.currentThread.getId}] get block length = $len)
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
```
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 ```test.scala
 import 

[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```test.scala
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
println(s[${Thread.currentThread.getId}] get block length = $len)
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
```
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```scala
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
```
If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 ```test.scala
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 def test = {
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = 
 blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 println(s[${Thread.currentThread.getId}] get block length = $len)
 }
 def test_driver(count:Int, parallel:Int)(f: = Unit) = {
 val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
 val taskSupport  = new 
 scala.collection.parallel.ForkJoinTaskSupport(tpool)
 val parseq = (1 to count).par
 parseq.tasksupport = taskSupport
 

[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
{code:title=test.scala|borderStyle=solid}
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
println(s[${Thread.currentThread.getId}] get block length = $len)
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
{code}
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use such comman to catch executor on slave node
{code}
pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p 
$pid|grep $pid
{code}
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
{code:title=test.scala|borderStyle=solid}
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
println(s[${Thread.currentThread.getId}] get block length = $len)
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
{code}
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory 

[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
{code:title=test.scala|borderStyle=solid}
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
println(s[${Thread.currentThread.getId}] get block length = $len)
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
{code}
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:

progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 {code:title=test.scala|borderStyle=solid}
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 def test = {
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = 
 blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 println(s[${Thread.currentThread.getId}] get block length = $len)
 }
 def test_driver(count:Int, parallel:Int)(f: = Unit) = {
 val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
 val taskSupport  = new 
 scala.collection.parallel.ForkJoinTaskSupport(tpool)
 val parseq = (1 to count).par
 parseq.tasksupport = 

[jira] [Comment Edited] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14341957#comment-14341957
 ] 

SaintBacchus edited comment on SPARK-6056 at 3/1/15 6:01 AM:
-

[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num is visual memory and it had raised about 180mb in the moment and 
total transfor 200mb data (20 * 10MB) from executor to driver.
I think it's a big problem. If I use 40 threads to get the result, it will need 
near 400mb momery and so exceed the limit of yarn fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the user side number of remote fetch block threads is uncontrollable, 
it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.


was (Author: carlmartin):
[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num had raised about 180mb in the moment and total transfor 200mb data 
(20 * 10MB) from executor to driver.
I think it's a big problem. If I use 40 threads to get the result, it will need 
near 400mb momery and so exceed the limit of yarn fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the user side number of remote fetch block threads is uncontrollable, 
it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.

 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 {code:title=test.scala|borderStyle=solid}
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 def test = {
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = 
 blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 println(s[${Thread.currentThread.getId}] get block length = $len)
 }
 def test_driver(count:Int, parallel:Int)(f: = Unit) = {
 val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
 val taskSupport  = new 
 scala.collection.parallel.ForkJoinTaskSupport(tpool)
 val parseq = (1 to 

[jira] [Commented] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14341957#comment-14341957
 ] 

SaintBacchus commented on SPARK-6056:
-

[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num had raised about 180mb in the moment and total transfor 200mb data 
(20 * 10MB) from executor to driver.
I think it's a big problem. If I use 40 threads to get the result, it will need 
near 400mb momery and so exceed the limit of yarn fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the user side number of remote fetch block threads is uncontrollable, 
it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.

 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 {code:title=test.scala|borderStyle=solid}
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 def test = {
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = 
 blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 println(s[${Thread.currentThread.getId}] get block length = $len)
 }
 def test_driver(count:Int, parallel:Int)(f: = Unit) = {
 val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
 val taskSupport  = new 
 scala.collection.parallel.ForkJoinTaskSupport(tpool)
 val parseq = (1 to count).par
 parseq.tasksupport = taskSupport
 parseq.foreach(x=f)
 tpool.shutdown
 tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
 }
 {code}
 progress:
 1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
 2. :load test.scala in spark-shell
 3. use such comman to catch executor on slave node
 {code}
 pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print $1}');top -b -p 
 $pid|grep $pid
 {code}
 4. test_driver(20,100)(test) in spark-shell
 5. watch the output of the command on slave node
 If use multi-thread to get len, the physical memery will soon   exceed the 
 limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14341957#comment-14341957
 ] 

SaintBacchus edited comment on SPARK-6056 at 3/1/15 6:03 AM:
-

[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num is visual memory and it had raised about 180mb in the moment and 
total transfor 200mb data (20 * 10MB) from executor to driver.
I think it's a problem. If I use 40 threads to get the result, it will need 
near 400mb momery and soon exceed the limit of yarn, fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the  number of remote fetch block threads in user side is 
uncontrollable, it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.


was (Author: carlmartin):
[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num is visual memory and it had raised about 180mb in the moment and 
total transfor 200mb data (20 * 10MB) from executor to driver.
I think it's a big problem. If I use 40 threads to get the result, it will need 
near 400mb momery and so exceed the limit of yarn fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the user side number of remote fetch block threads is uncontrollable, 
it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.

 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 {code:title=test.scala|borderStyle=solid}
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 def test = {
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = 
 blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 println(s[${Thread.currentThread.getId}] get block length = $len)
 }
 def test_driver(count:Int, parallel:Int)(f: = Unit) = {
 val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
 val taskSupport  = new 
 scala.collection.parallel.ForkJoinTaskSupport(tpool)

[jira] [Comment Edited] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14341957#comment-14341957
 ] 

SaintBacchus edited comment on SPARK-6056 at 3/1/15 6:04 AM:
-

[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num is visual memory and it had raised about 180mb in the moment and 
the test case total transfor 200mb data (20 * 10MB) from executor to driver.
I think it's a problem. If I use 40 threads to get the result, it will need 
near 400mb momery and soon exceed the limit of yarn, fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the  number of remote fetch block threads in user side is 
uncontrollable, it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.


was (Author: carlmartin):
[~adav] Thx for comment. I can't understand what you say clearly. Do you mean 
if 'spark.shuffle.io.preferDirectBufs ' was set to be false, would netty only 
allocat the heap memory? Is it right?
I test it again.And I had update the test program in the Description.
I had set the _spark.shuffle.io.preferDirectBufs_  and when I type 
*test_driver(20,100)(test)* , the result is this:
_ 76602 root  20   0 1597m 339m  25m S0  0.1   0:04.89 java   _
 _76602 root  20   0 {color:red} 1777m {color} 1.0g  26m S   99  0.3   
0:07.88 java   _
 
_ 76602 root  20   0 1597m 880m  26m S4  0.3   0:07.99 java_

The red num is visual memory and it had raised about 180mb in the moment and 
total transfor 200mb data (20 * 10MB) from executor to driver.
I think it's a problem. If I use 40 threads to get the result, it will need 
near 400mb momery and soon exceed the limit of yarn, fanally killed by yarn.
If there is a way to limit the peek use of memory, it will be fine. In addtion, 
I though the  number of remote fetch block threads in user side is 
uncontrollable, it's better to be controlled in spark.
[~lianhuiwang] I use the recent spark in github and I also tested the 1.2.0 
release version.
In my test case, I use the default memory: 1G executor and 384 overhead. But in 
the real case, momery is much more.

 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 {code:title=test.scala|borderStyle=solid}
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 def test = {
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = 
 blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 println(s[${Thread.currentThread.getId}] get block length = $len)
 }
 def test_driver(count:Int, parallel:Int)(f: = Unit) = {
 val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
 val taskSupport  = new 
 

[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```test.scala
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test =
 {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = 
{
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
```
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```test.scala
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test = {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = {
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
```
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 ```test.scala
 import org.apache.spark.storage._
 import org.apache.spark._
 val bufferRdd = 

[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-28 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:

progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

  was:
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```test.scala
import org.apache.spark.storage._
import org.apache.spark._
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
def test =
 {
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
}

def test_driver(count:Int, parallel:Int)(f: = Unit) = 
{
val tpool = new scala.concurrent.forkjoin.ForkJoinPool(parallel)
val taskSupport  = new scala.collection.parallel.ForkJoinTaskSupport(tpool)
val parseq = (1 to count).par
parseq.tasksupport = taskSupport
parseq.foreach(x=f)

tpool.shutdown
tpool.awaitTermination(100, java.util.concurrent.TimeUnit.SECONDS)
}
```
progress:
1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
2. :load test.scala in spark-shell
3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
$1}');top -b -p $pid|grep $pid} to catch executor on slave node
4. test_driver(20,100)(test) in spark-shell
5. watch the output of the command on slave node

If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 progress:
 1. bin/spark-shell --master yarn-cilent --executor-cores 40 --num-executors 1
 2. :load test.scala in spark-shell
 3. use comman {pid=$(jps|grep CoarseGrainedExecutorBackend |awk '{print 
 $1}');top -b -p $pid|grep $pid} to catch executor on slave node
 4. test_driver(20,100)(test) in spark-shell
 5. watch the output of the command on slave node
 If use multi-thread to get len, the physical memery will soon   exceed the 
 limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-27 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14339861#comment-14339861
 ] 

SaintBacchus edited comment on SPARK-6056 at 2/27/15 8:13 AM:
--

Hi,[~adav] [~lianhuiwang][~zzcclp] I had read your discuss at 
https://issues.apache.org/jira/browse/SPARK-2468. I meet the similiar problem 
again.
No matter set the `preferDirectBufs`  or limit the number of thread or not, 
spark  can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.



was (Author: carlmartin):
Hi,[~adav] [~lianhuiwang][~zzcclp] I had read your discuss at 
https://issues.apache.org/jira/browse/SPARK-2468. I meet the similiar problem 
again.
No matter set the `preferDirectBufs`  or limit the number of thread or not can 
not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-27 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-6056:

Description: 
No matter set the `preferDirectBufs` or limit the number of thread or not 
,spark can not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.
I wrote a simple code to test it:
```scala
val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
Array[Byte](10*1024*1024)).persist
bufferRdd.count
val part =  bufferRdd.partitions(0)
val sparkEnv = SparkEnv.get
val blockMgr = sparkEnv.blockManager
val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
val len = resultIt.map(_.length).sum
```
If use multi-thread to get len, the physical memery will soon   exceed the 
limit set by spark.yarn.executor.memoryOverhead

 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus

 No matter set the `preferDirectBufs` or limit the number of thread or not 
 ,spark can not limit the use of offheap memory.
 At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, 
 Netty had allocated a offheap memory buffer with the same size in heap.
 So how many buffer you want to transfor, the same size offheap memory will be 
 allocated.
 But once the allocated memory size reach the capacity of the overhead momery 
 set in yarn, this executor will be killed.
 I wrote a simple code to test it:
 ```scala
 val bufferRdd = sc.makeRDD(0 to 10, 10).map(x=new 
 Array[Byte](10*1024*1024)).persist
 bufferRdd.count
 val part =  bufferRdd.partitions(0)
 val sparkEnv = SparkEnv.get
 val blockMgr = sparkEnv.blockManager
 val blockOption = blockMgr.get(RDDBlockId(bufferRdd.id, part.index))
 val resultIt = blockOption.get.data.asInstanceOf[Iterator[Array[Byte]]]
 val len = resultIt.map(_.length).sum
 ```
 If use multi-thread to get len, the physical memery will soon   exceed the 
 limit set by spark.yarn.executor.memoryOverhead



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-27 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14339861#comment-14339861
 ] 

SaintBacchus commented on SPARK-6056:
-

Hi,[~adav] [~lianhuiwang][~zzcclp] I had read your discuss at 
https://issues.apache.org/jira/browse/SPARK-2468. I meet the similiar problem 
again.
No matter set the `preferDirectBufs`  or limit the number of thread or not can 
not limit the use of offheap memory.
At line 269 of the class 'AbstractNioByteChannel' in netty-4.0.23.Final, Netty 
had allocated a offheap memory buffer with the same size in heap.
So how many buffer you want to transfor, the same size offheap memory will be 
allocated.
But once the allocated memory size reach the capacity of the overhead momery 
set in yarn, this executor will be killed.


 Unlimit offHeap memory use cause RM killing the container
 -

 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6056) Unlimit offHeap memory use cause RM killing the container

2015-02-26 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-6056:
---

 Summary: Unlimit offHeap memory use cause RM killing the container
 Key: SPARK-6056
 URL: https://issues.apache.org/jira/browse/SPARK-6056
 Project: Spark
  Issue Type: Bug
  Components: Shuffle, Spark Core
Affects Versions: 1.2.1
Reporter: SaintBacchus






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5444) 'spark.blockManager.port' conflict in netty service

2015-01-27 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-5444:
---

 Summary: 'spark.blockManager.port' conflict in netty service
 Key: SPARK-5444
 URL: https://issues.apache.org/jira/browse/SPARK-5444
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 1.2.0
Reporter: SaintBacchus


If set the 'spark.blockManager.port` = 4040 in spark-default.conf, it will 
throw the conflict port exception and exit directly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5065) BroadCast can still work after sc had been stopped.

2015-01-03 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-5065:
---

 Summary: BroadCast can still work after sc had been stopped.
 Key: SPARK-5065
 URL: https://issues.apache.org/jira/browse/SPARK-5065
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.2.1
Reporter: SaintBacchus


Code as follow:
{code:borderStyle=solid}
val sc1 = new SparkContext
val sc2 = new SparkContext
sc1.broadcast(1)
sc1.stop
{code}
It can work well, because sc1.broadcast will reuse the BlockManager in sc2.
To fix it, throw a sparkException when broadCastManager had stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-5065) BroadCast can still work after sc had been stopped.

2015-01-03 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-5065:

Description: 
Code as follow:
{code:borderStyle=solid}
val sc1 = new SparkContext
val sc2 = new SparkContext
sc1.stop
sc1.broadcast(1)
{code}
It can work well, because sc1.broadcast will reuse the BlockManager in sc2.
To fix it, throw a sparkException when broadCastManager had stopped.

  was:
Code as follow:
{code:borderStyle=solid}
val sc1 = new SparkContext
val sc2 = new SparkContext
sc1.broadcast(1)
sc1.stop
{code}
It can work well, because sc1.broadcast will reuse the BlockManager in sc2.
To fix it, throw a sparkException when broadCastManager had stopped.


 BroadCast can still work after sc had been stopped.
 ---

 Key: SPARK-5065
 URL: https://issues.apache.org/jira/browse/SPARK-5065
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.2.1
Reporter: SaintBacchus

 Code as follow:
 {code:borderStyle=solid}
 val sc1 = new SparkContext
 val sc2 = new SparkContext
 sc1.stop
 sc1.broadcast(1)
 {code}
 It can work well, because sc1.broadcast will reuse the BlockManager in sc2.
 To fix it, throw a sparkException when broadCastManager had stopped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4929) Yarn Client mode can not support the HA after the exitcode change

2014-12-22 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-4929:
---

 Summary: Yarn Client mode can not support the HA after the 
exitcode change
 Key: SPARK-4929
 URL: https://issues.apache.org/jira/browse/SPARK-4929
 Project: Spark
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.2.0
Reporter: SaintBacchus


Nowadays, yarn-client will exit directly when the HA change happens no matter 
how many times the am should retry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4694) Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in yarn-client mode

2014-12-03 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14232724#comment-14232724
 ] 

SaintBacchus commented on SPARK-4694:
-

Thanks for reply. [~vanzin] the problem is very sure: the scheduler backend was 
aware of the AM had been exited so it call sc.stop to exit the driver process 
but there was a user thread which was still alive and cause this problem.
To fix this, just using System.exit(-1) instead of the sc.stop so that jvm will 
not wait all the user threads being down and exit clearly.
Can I use System.exit() in spark code?

 Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in 
 yarn-client mode
 -

 Key: SPARK-4694
 URL: https://issues.apache.org/jira/browse/SPARK-4694
 Project: Spark
  Issue Type: Bug
  Components: YARN
Reporter: SaintBacchus

 Recently when I use the Yarn HA mode to test the HiveThriftServer2 I found a 
 problem that the driver can't exit by itself.
 To reappear it, you can do as fellow:
 1.use yarn HA mode and set am.maxAttemp = 1for convenience
 2.kill the active resouce manager in cluster
 The expect result is just failed, because the maxAttemp was 1.
 But the actual result is that: all executor was ended but the driver was 
 still there and never close.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4694) Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in yarn-client mode

2014-12-02 Thread SaintBacchus (JIRA)
SaintBacchus created SPARK-4694:
---

 Summary: Long-run user thread(such as HiveThriftServer2) causes 
the 'process leak' in yarn-client mode
 Key: SPARK-4694
 URL: https://issues.apache.org/jira/browse/SPARK-4694
 Project: Spark
  Issue Type: Bug
Reporter: SaintBacchus


Recently when I use the Yarn HA mode to test the HiveThriftServer2 I found a 
problem that the driver can't exit by itself.
To reappear it, you can do as fellow:
1.use yarn HA mode and set am.maxAttemp = 1for convenience
2.kill the active resouce manager in cluster

The expect result is just failed, because the maxAttemp was 1.

But the actual result is that: all executor was ended but the driver was still 
there and never close.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-4694) Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in yarn-client mode

2014-12-02 Thread SaintBacchus (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SaintBacchus updated SPARK-4694:

Component/s: YARN

 Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in 
 yarn-client mode
 -

 Key: SPARK-4694
 URL: https://issues.apache.org/jira/browse/SPARK-4694
 Project: Spark
  Issue Type: Bug
  Components: YARN
Reporter: SaintBacchus

 Recently when I use the Yarn HA mode to test the HiveThriftServer2 I found a 
 problem that the driver can't exit by itself.
 To reappear it, you can do as fellow:
 1.use yarn HA mode and set am.maxAttemp = 1for convenience
 2.kill the active resouce manager in cluster
 The expect result is just failed, because the maxAttemp was 1.
 But the actual result is that: all executor was ended but the driver was 
 still there and never close.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4694) Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in yarn-client mode

2014-12-02 Thread SaintBacchus (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14231440#comment-14231440
 ] 

SaintBacchus commented on SPARK-4694:
-

The reason was that Yarn had reported the status to the RM and the 
YarnClientSchedulerBackend would detect the status to stop sc in function 
'asyncMonitorApplication'.
But the HiveThriftServer2 is a long-run user thread. JVM will never exit until 
all the no-demo threads have ended or using System.exit().
It cause such problem.
The easiest way to reslove this problem is using System.exit(0) instead of 
sc.stop in funciton 'asyncMonitorApplication' .
But system.exit is not recommended in 
https://issues.apache.org/jira/browse/SPARK-4584
Do you have any ideas about this problem? [~vanzin]

 Long-run user thread(such as HiveThriftServer2) causes the 'process leak' in 
 yarn-client mode
 -

 Key: SPARK-4694
 URL: https://issues.apache.org/jira/browse/SPARK-4694
 Project: Spark
  Issue Type: Bug
  Components: YARN
Reporter: SaintBacchus

 Recently when I use the Yarn HA mode to test the HiveThriftServer2 I found a 
 problem that the driver can't exit by itself.
 To reappear it, you can do as fellow:
 1.use yarn HA mode and set am.maxAttemp = 1for convenience
 2.kill the active resouce manager in cluster
 The expect result is just failed, because the maxAttemp was 1.
 But the actual result is that: all executor was ended but the driver was 
 still there and never close.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org