[ 
https://issues.apache.org/jira/browse/SAMZA-316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Kleppmann updated SAMZA-316:
-----------------------------------
    Labels: project  (was: )

> Allow jobs to expose network service endpoints
> ----------------------------------------------
>
>                 Key: SAMZA-316
>                 URL: https://issues.apache.org/jira/browse/SAMZA-316
>             Project: Samza
>          Issue Type: New Feature
>          Components: container
>    Affects Versions: 0.7.0
>            Reporter: Martin Kleppmann
>              Labels: project
>
> At the moment, the only way of contacting a task instance in a Samza job is 
> to send a message to one of the job's input streams. Sometimes it would be 
> useful to be able to contact a task instance directly, as a TCP network 
> service. Example use cases:
> - Allowing a remote client to query a Samza job's state store (which may, in 
> some cases, obviate the need for writing job output into a separate, publicly 
> readable database).
> - Allowing a client to "tap into" a stream without consuming the entire 
> stream. For example, a client may wish to be notified about all log messages 
> matching a particular regular expression as long as they are connected (the 
> notifications stop when they disconnect).
> - Performing an expensive on-demand computation quickly by parallelizing it 
> across many tasks in one or more Samza jobs, like in Storm's [distributed 
> RPC|https://storm.incubator.apache.org/documentation/Distributed-RPC.html].
> These use cases can be implemented by running a network server (e.g. a HTTP 
> server, a WebSocket server, a Thrift RPC server, etc) in each Samza 
> container. We then need to solve two problems:
> # How is a client request routed to the right container? (Assuming Samza is 
> running in a YARN cluster, we have no way of knowing the IP address and port 
> number of a container a priori. Also, the endpoint may change as containers 
> fail and are restarted.)
> # How are requests from remote clients integrated with Samza's StreamTask 
> programming model? (Requests may arrive at any time; responses may be 
> delivered immediately or asynchronously at some later time; depending on the 
> protocol, there may be a stream of requests and responses on a single client 
> connection.)
> Point (1) is a service discovery problem, for which there are many possible 
> solutions, but no one obvious winner. [Helix|http://helix.apache.org/] could 
> be used for this (although Helix also provides various cluster management 
> features that we probably wouldn't need). YARN is considering integrating a 
> service discovery mechanism (YARN-913). [Finagle 
> ServerSets|http://stevenskelton.ca/finagle-serverset-clusters-using-zookeeper/]
>  and [Rest.li D2|https://github.com/linkedin/rest.li/wiki/Dynamic-Discovery] 
> both use Zookeeper for service discovery. Looking beyond the end of our 
> JVM-based nose, projects like [Consul|http://www.consul.io/], 
> [SkyDNS|http://blog.gopheracademy.com/skydns] and 
> [etcd|https://github.com/coreos/etcd] also provide service discovery. It 
> would be worth surveying the landscape and figuring out the various pros and 
> cons before settling on one particular service discovery mechanism. In 
> particular, we should keep in mind the needs of clients that are not written 
> in a JVM-based language.
> Whatever service discovery solution is chosen, we will need to decide whether 
> to use a separate TCP port for each TaskInstance within a container, or 
> whether to use some application-level mechanism for deciding which 
> TaskInstance should process a particular incoming request.
> For (2) I propose the following: a network service is implemented as just 
> another system (i.e. implementing SystemFactory) in Samza. Every incoming 
> request from a client is wrapped in an IncomingMessageEnvelope, and goes 
> through the same MessageChooser flow as everything else. This ensures that 
> StreamTask remains single-threaded and easy to reason about.
> Each connection from a client is given a unique stream name. This allows the 
> StreamTask to tell which requests came from the same client. In order to send 
> a response to a client, the StreamTask sends an OutgoingMessageEnvelope to an 
> output stream, using the same system and stream name as the incoming request. 
> This means that a StreamTask can generate a response immediately if it wants 
> to, or it could send a response asynchronously at some later point in time. 
> It also works for protocols that can have many requests and responses on a 
> long-lived connection, e.g. WebSocket.
> Special incoming messages can be used to indicate that a client has connected 
> or disconnected (allowing cleanup of any per-client information in the 
> StreamTask), and a special outgoing message can be used to tell the network 
> service to disconnect a particular client.
> If a container is shut down, all of its clients will be disconnected and will 
> need to reconnect. I don't think we need to worry about preserving 
> connections across container restarts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to