Martin Kleppmann created SAMZA-316:
--------------------------------------

             Summary: Allow jobs to expose network service endpoints
                 Key: SAMZA-316
                 URL: https://issues.apache.org/jira/browse/SAMZA-316
             Project: Samza
          Issue Type: New Feature
          Components: container
    Affects Versions: 0.7.0
            Reporter: Martin Kleppmann


At the moment, the only way of contacting a task instance in a Samza job is to 
send a message to one of the job's input streams. Sometimes it would be useful 
to be able to contact a task instance directly, as a TCP network service. 
Example use cases:

- Allowing a remote client to query a Samza job's state store (which may, in 
some cases, obviate the need for writing job output into a separate, publicly 
readable database).
- Allowing a client to "tap into" a stream without consuming the entire stream. 
For example, a client may wish to be notified about all log messages matching a 
particular regular expression as long as they are connected (the notifications 
stop when they disconnect).
- Performing an expensive on-demand computation quickly by parallelizing it 
across many tasks in one or more Samza jobs, like in Storm's [distributed 
RPC|https://storm.incubator.apache.org/documentation/Distributed-RPC.html].

These use cases can be implemented by running a network server (e.g. a HTTP 
server, a WebSocket server, a Thrift RPC server, etc) in each Samza container. 
We then need to solve two problems:

# How is a client request routed to the right container? (Assuming Samza is 
running in a YARN cluster, we have no way of knowing the IP address and port 
number of a container a priori. Also, the endpoint may change as containers 
fail and are restarted.)
# How are requests from remote clients integrated with Samza's StreamTask 
programming model? (Requests may arrive at any time; responses may be delivered 
immediately or asynchronously at some later time; depending on the protocol, 
there may be a stream of requests and responses on a single client connection.)

Point (1) is a service discovery problem, for which there are many possible 
solutions, but no one obvious winner. [Helix|http://helix.apache.org/] could be 
used for this (although Helix also provides various cluster management features 
that we probably wouldn't need). YARN is considering integrating a service 
discovery mechanism (YARN-913). [Finagle 
ServerSets|http://stevenskelton.ca/finagle-serverset-clusters-using-zookeeper/] 
and [Rest.li D2|https://github.com/linkedin/rest.li/wiki/Dynamic-Discovery] 
both use Zookeeper for service discovery. Looking beyond the end of our 
JVM-based nose, projects like [Consul|http://www.consul.io/], 
[SkyDNS|http://blog.gopheracademy.com/skydns] and 
[etcd|https://github.com/coreos/etcd] also provide service discovery. It would 
be worth surveying the landscape and figuring out the various pros and cons 
before settling on one particular service discovery mechanism. In particular, 
we should keep in mind the needs of clients that are not written in a JVM-based 
language.

Whatever service discovery solution is chosen, we will need to decide whether 
to use a separate TCP port for each TaskInstance within a container, or whether 
to use some application-level mechanism for deciding which TaskInstance should 
process a particular incoming request.

For (2) I propose the following: a network service is implemented as just 
another system (i.e. implementing SystemFactory) in Samza. Every incoming 
request from a client is wrapped in an IncomingMessageEnvelope, and goes 
through the same MessageChooser flow as everything else. This ensures that 
StreamTask remains single-threaded and easy to reason about.

Each connection from a client is given a unique stream name. This allows the 
StreamTask to tell which requests came from the same client. In order to send a 
response to a client, the StreamTask sends an OutgoingMessageEnvelope to an 
output stream, using the same system and stream name as the incoming request. 
This means that a StreamTask can generate a response immediately if it wants 
to, or it could send a response asynchronously at some later point in time. It 
also works for protocols that can have many requests and responses on a 
long-lived connection, e.g. WebSocket.

Special incoming messages can be used to indicate that a client has connected 
or disconnected (allowing cleanup of any per-client information in the 
StreamTask), and a special outgoing message can be used to tell the network 
service to disconnect a particular client.

If a container is shut down, all of its clients will be disconnected and will 
need to reconnect. I don't think we need to worry about preserving connections 
across container restarts.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to