[jira] [Comment Edited] (SPARK-5124) Standardize internal RPC interface

2015-03-04 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348215#comment-14348215
 ] 

Shixiong Zhu edited comment on SPARK-5124 at 3/5/15 6:09 AM:
-

I made some changes to the above APIs.

1. I added a new trait RpcResponse because RpcResponseCallback can not be used 
directly. Both `receive` and `receiveAndReply` methods can receive messages if 
the sender is an RpcEndpoint, so if the receiver wants to reply the sender, it 
needs an approach to specify which methods it wants to send. Here is the 
interface of RpcResponse:

{quote}
trait RpcResponse {
  def reply(response: Any): Unit
  def replyWithSender(response: Any, sender: RpcEndpointRef): Unit
  def fail(e: Throwable): Unit
}
{quote}

Calling `reply` will send the message to RpcEndpoint.receive, and calling 
`replyWithSender` will send the message to RpcEndpoint.replyWithSender.

2.Insteading of adding a trait like ThreadSafeRpcEndpoint, I added a new method 
`setupThreadSafeEndpoint` in RpcEnv, so that RpcEnv can decide how to implement 
the thread-safe semantics internally. For `setupEndpoint`, RpcEnv won't 
guarantee sending messages thread-safely.

[~vanzin] For the local Endpoint idea, I think we need a local message 
dispatcher to send/receive messages between different Endpoint. It can not be 
implemented just in the Endpoint. We will also need EndpointRef to refer to an 
Endpoint, and a generic trait for the local message dispatcher and RpcEnv. It 
looks a bit complex. What do you think?

Please help review my PR: https://github.com/apache/spark/pull/4588


was (Author: zsxwing):
I made some changes to the above APIs.

1. I added a new trait RpcResponse because RpcResponseCallback can not be used 
directly. Both `receive` and `receiveAndReply` methods can receive messages if 
the sender is an RpcEndpoint, so if the receiver wants to reply the sender, it 
needs an approach to specify which methods it wants to send. Here is the 
interface of RpcResponse:

{quote}
private[spark] trait RpcResponse {
  def reply(response: Any): Unit
  def replyWithSender(response: Any, sender: RpcEndpointRef): Unit
  def fail(e: Throwable): Unit
}
{quote}

Calling `reply` will send the message to RpcEndpoint.receive, and calling 
`replyWithSender` will send the message to RpcEndpoint.replyWithSender.

2.Insteading of adding a trait like ThreadSafeRpcEndpoint, I added a new method 
`setupThreadSafeEndpoint` in RpcEnv, so that RpcEnv can decide how to implement 
the thread-safe semantics internally. For `setupEndpoint`, RpcEnv won't 
guarantee sending messages thread-safely.

[~vanzin] For the local Endpoint idea, I think we need a local message 
dispatcher to send/receive messages between different Endpoint. It can not be 
implemented just in the Endpoint. We will also need EndpointRef to refer to an 
Endpoint, and a generic trait for the local message dispatcher and RpcEnv. It 
looks a bit complex. What do you think?

Please help review my PR: https://github.com/apache/spark/pull/4588

> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5124) Standardize internal RPC interface

2015-03-04 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348215#comment-14348215
 ] 

Shixiong Zhu edited comment on SPARK-5124 at 3/5/15 6:10 AM:
-

I made some changes to the above APIs.

1. I added a new trait RpcResponse because RpcResponseCallback can not be used 
directly. Both `receive` and `receiveAndReply` methods can receive messages if 
the sender is an RpcEndpoint, so if the receiver wants to reply the sender, it 
needs an approach to specify which methods it wants to send. Here is the 
interface of RpcResponse:

{code}
private[spark] trait RpcResponse {
  def reply(response: Any): Unit
  def replyWithSender(response: Any, sender: RpcEndpointRef): Unit
  def fail(e: Throwable): Unit
}
{code}

Calling `reply` will send the message to RpcEndpoint.receive, and calling 
`replyWithSender` will send the message to RpcEndpoint.replyWithSender.

2.Insteading of adding a trait like ThreadSafeRpcEndpoint, I added a new method 
`setupThreadSafeEndpoint` in RpcEnv, so that RpcEnv can decide how to implement 
the thread-safe semantics internally. For `setupEndpoint`, RpcEnv won't 
guarantee sending messages thread-safely.

[~vanzin] For the local Endpoint idea, I think we need a local message 
dispatcher to send/receive messages between different Endpoint. It can not be 
implemented just in the Endpoint. We will also need EndpointRef to refer to an 
Endpoint, and a generic trait for the local message dispatcher and RpcEnv. It 
looks a bit complex. What do you think?

Please help review my PR: https://github.com/apache/spark/pull/4588


was (Author: zsxwing):
I made some changes to the above APIs.

1. I added a new trait RpcResponse because RpcResponseCallback can not be used 
directly. Both `receive` and `receiveAndReply` methods can receive messages if 
the sender is an RpcEndpoint, so if the receiver wants to reply the sender, it 
needs an approach to specify which methods it wants to send. Here is the 
interface of RpcResponse:

{quote}
trait RpcResponse {
  def reply(response: Any): Unit
  def replyWithSender(response: Any, sender: RpcEndpointRef): Unit
  def fail(e: Throwable): Unit
}
{quote}

Calling `reply` will send the message to RpcEndpoint.receive, and calling 
`replyWithSender` will send the message to RpcEndpoint.replyWithSender.

2.Insteading of adding a trait like ThreadSafeRpcEndpoint, I added a new method 
`setupThreadSafeEndpoint` in RpcEnv, so that RpcEnv can decide how to implement 
the thread-safe semantics internally. For `setupEndpoint`, RpcEnv won't 
guarantee sending messages thread-safely.

[~vanzin] For the local Endpoint idea, I think we need a local message 
dispatcher to send/receive messages between different Endpoint. It can not be 
implemented just in the Endpoint. We will also need EndpointRef to refer to an 
Endpoint, and a generic trait for the local message dispatcher and RpcEnv. It 
looks a bit complex. What do you think?

Please help review my PR: https://github.com/apache/spark/pull/4588

> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5124) Standardize internal RPC interface

2015-01-09 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14270771#comment-14270771
 ] 

Reynold Xin edited comment on SPARK-5124 at 1/9/15 8:49 AM:


Thanks for doing this. I haven't looked through all the changes yet, but here 
are some quick comments:

1. For DAGScheduler, we are probably OK to just use an event loop instead of an 
actor. Just put some messages into a queue, and have a loop that processes the 
queue. Otherwise we are making every call to DAGScheduler going through a 
socket and that can severely impact scheduler throughput. (Although I haven't 
looked closely at your change so maybe you are doing a different thing here)

2. Is it really that expensive to listen to network events that warrant a 
separate NetworkRpcEndpoint?

3. Can you comment on the following two?
* Thread-safe contract when processing messages (similar to Akka).
* A simple fault tolerance(if a RpcEndpoint is crashed, restart it or stop it).

Personally I think those are OK to leave it up to the RPC user (e.g. 
BlockManagerMasterActor should be able to decide if it needs to lock or not, 
and an RPCEndPoint should never have uncaught exceptions).


was (Author: rxin):
Thanks for doing this. I haven't looked through all the changes yet, but here 
are some quick comments:

1. For DAGScheduler, we are probably OK to just use an event loop instead of an 
actor. Just put some messages into a queue, and have a loop that processes the 
queue. Otherwise we are making every call to DAGScheduler going through a 
socket and that can severely impact scheduler throughput. (Although I haven't 
looked closely at your change so maybe you are doing a different thing here)

2. Is it really that expensive to listen to network events that warrant a 
separate NetworkRpcEndpoint?

> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5124) Standardize internal RPC interface

2015-02-23 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334166#comment-14334166
 ] 

Marcelo Vanzin edited comment on SPARK-5124 at 2/24/15 1:08 AM:


Hi [~zsxwing], I briefly looked over the design and parts of the 
implementation, and I have some comments. Overall this looks OK, even if it 
kinda does look a lot like akka's actor system, but I don't think there would 
be a problem implementing it over a different RPC library.

One thing that I found very confusing is {{RpcEndpoint}} vs. 
{{NetworkRpcEndpoint}}. The "R" in RPC means "remote", which sort of implies 
networking. I think Reynold touched on this when he talked about using an event 
loop for "local actors". Perhaps a more flexible approach would be something 
like:

- A generic {{Endpoint}} that defines the {{receive()}} method and other 
generic interfaces
- {{RpcEnv}} takes an {{Endpoint}} and a name to register endpoints for remote 
connections. Things like "onConnected" and "onDisassociated" become messages 
passed to {{receive()}}, kinda like in akka today. This means that there's no 
need for a specialized {{RpcEndpoint}} interface.
- "local" Endpoints could be exposed directly, without the need for an 
{{RpcEnv}}. For example, as a getter in {{SparkEnv}}. You could have some 
wrapper to expose {{send()}} and {{ask()}} so that the client interface looks 
similar for remote and local endpoints.
- The default {{Endpoint}} has no thread-safety guarantees. You can wrap an 
{{Endpoint}} in an {{EventLoop}} if you want messages to be handled using a 
queue, or synchronize your {{receive()}} method (although that can block the 
dispatcher thread, which could be bad). But this would easily allow actors to 
process multiple messages concurrently if desired.

What do you think?




was (Author: vanzin):
Hi [~zsxwing], I briefly looked over the design and parts of the 
implementation, and I have some comments. Overall this looks OK, even if it 
kinda does look a lot like akka's actor system, but I don't think there would 
be a problem implementing it over a different RPC library.

One thing that I found very confusing is {{RpcEndpoint}} vs. 
{{NetworkRpcEndpoint}}. The "R" in RPC means "remote", which sort of implies 
networking. I think Reynold touched on this when he talked about using an event 
loop for "local actors". Perhaps a more flexible approach would be something 
like:

- A generic {{Endpoint}} that defines the {{receive()}} method and other 
generic interfaces
- {{RpcEnv}} takes an {{Endpoint}} and a name to register endpoints for remote 
connections. Things like "onConnected" and "onDisassociated" become messages 
passed to {{receive()}}, kinda like in akka today. This means that there's no 
need for a specialized {{RpcEndpoint}} interface.
- "local" Endpoints could be exposed directly, without the need for an 
{{RpcEnv}}. For example, as a getter in {{SparkEnv}}. You could have some 
wrapper to expose {{send()}} and {{ask()}} so that the client interface looks 
similar for remote and local endpoints.
- The default {{Endpoint}} has no thread-safety guarantees. You can wrap an 
{{Endpoint}} in an {{EventLoop}} if you want messages to be handled using a 
queue, or synchronize your {{receive()}} method (although that can block the 
dispatcher thread, which could be bad). But this would easily allow actors to 
process multiple messages concurrently is desired.

What do you think?



> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5124) Standardize internal RPC interface

2015-02-24 Thread Marcelo Vanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14335252#comment-14335252
 ] 

Marcelo Vanzin edited comment on SPARK-5124 at 2/24/15 7:03 PM:


Hi @rxin,

Yeah, the current {{receive()}} gives you the message and the sender, but it 
doesn't give you a way to explicitly reply to a message. That means two things:

- It's not clear whether calling something like {{sender.send()}} would be 
thread-safe. For example, in akka it would because akka processes one message 
at a time for each actor, so you're sending replies in order. But that forces 
you to implement the same semantics on any new RPC layer, and that is one akka 
feature thing that I'd prefer not to inherit (as you say, let each endpoint 
decide whether it needs it).

- It precludes anynchronously handling that message for much of the same 
reasons. e.g., let's say some incoming message requires a long computation 
before sending the reply, and you use a thread pool for that. With 
{{sender.send()}}, you'd need to build correlation logic into your application. 
If you have something like {{message.reply()}} instead, you don't need to - the 
RPC layer handles correlations and timing out if a reply doesn't arrive.

Granted, I have no idea how you'd implement that second bullet in akka, and 
perhaps for that reason I don't think Spark currently uses that pattern 
anywhere. But it sounds like a useful feature to have.



was (Author: vanzin):
Hi @rxin,

Yeah, the current {{receive()}} gives you the message and the sender, but it 
doesn't give you a way to explicit reply to a message. That means two things:

- It's not clear whether calling something like {{sender.send()}} would be 
thread-safe. For example, in akka it would because akka processes one message 
at a time for each actor, so you're sending replies in order. But that forces 
you to implement the same semantics on any new RPC layer, and that is one akka 
feature thing that I'd prefer not to inherit (as you say, let each endpoint 
decide whether it needs it).

- It precludes anynchronously handling that message for much of the same 
reasons. e.g., let's say some incoming message requires a long computation 
before sending the reply, and you use a thread pool for that. With 
{{sender.send()}}, you'd need to build correlation logic into your application. 
If you have something like {{message.reply()}} instead, you don't need to - the 
RPC layer handles correlations and timing out if a reply doesn't arrive.

Granted, I have no idea how you'd implement that second bullet in akka, and 
perhaps for that reason I don't think Spark currently uses that pattern 
anywhere. But it sounds like a useful feature to have.


> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-5124) Standardize internal RPC interface

2015-02-25 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14337028#comment-14337028
 ] 

Reynold Xin edited comment on SPARK-5124 at 2/25/15 7:29 PM:
-

I went and looked at the various use cases of Akka in the code base. Quite a 
few of them actually also use askWithReply. There are really two categories of 
message deliveries: one that the sender expects a reply, and one that doesn't. 
As a result, I think the following interface would make more sense:

{code}
// receive side:
def receive(msg: RpcMessage)
def receiveAndReply(msg: RpcMessage) = sys.err("...")

// send side:
def send(message: Object): Unit
def sendWithReply[T](message: Object): Future[T]
{code}

There is an obvious pairing here. Messages sent via "send" goes to "receive", 
without requiring an ack (although if we use the transport layer, an implicit 
ack can be sent). Messages sent via "sendWithReply" goes to "receiveAndReply", 
and the caller needs to explicitly handle the reply. 

In most cases, an rpc endpoint only needs to receive messages without reply, 
and thus can override just receive. By making obvious distinctions between the 
two, I think we mitigate the risk of an end point not properly responding to a 
message, leading to memory leaks.





was (Author: rxin):
I went and looked at the various use cases of Akka in the code base. Quite a 
few of them actually also use askWithReply. There are really two categories of 
message deliveries: one that the sender expects a reply, and one that doesn't. 
As a result, I think the following interface would make more sense:

{code}
// receive side:
def receive(msg: RpcMessage)
def receiveAndReply(msg: RpcMessage) = sys.err("...")

// send side:
def send(message: Object): Unit
def sendWithReply[T](message: Object): Future[T]
{code}

There is an obvious pairing here. Messages sent via "send" goes to "receive", 
without requiring an ack (although if we use the transport layer, an implicit 
ack can be sent). Messages sent via "sendWithReply" goes to "receiveAndReply", 
and the caller needs to explicitly handle the reply. 

In most cases, an rpc endpoint only needs to receive messages without reply, 
and thus can override just receive. 




> Standardize internal RPC interface
> --
>
> Key: SPARK-5124
> URL: https://issues.apache.org/jira/browse/SPARK-5124
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Reporter: Reynold Xin
>Assignee: Shixiong Zhu
> Attachments: Pluggable RPC - draft 1.pdf, Pluggable RPC - draft 2.pdf
>
>
> In Spark we use Akka as the RPC layer. It would be great if we can 
> standardize the internal RPC interface to facilitate testing. This will also 
> provide the foundation to try other RPC implementations in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org