Martin Kleppmann created SAMZA-316:
--------------------------------------
Summary: Allow jobs to expose network service endpoints
Key: SAMZA-316
URL: https://issues.apache.org/jira/browse/SAMZA-316
Project: Samza
Issue Type: New Feature
Components: container
Affects Versions: 0.7.0
Reporter: Martin Kleppmann
At the moment, the only way of contacting a task instance in a Samza job is to
send a message to one of the job's input streams. Sometimes it would be useful
to be able to contact a task instance directly, as a TCP network service.
Example use cases:
- Allowing a remote client to query a Samza job's state store (which may, in
some cases, obviate the need for writing job output into a separate, publicly
readable database).
- Allowing a client to "tap into" a stream without consuming the entire stream.
For example, a client may wish to be notified about all log messages matching a
particular regular expression as long as they are connected (the notifications
stop when they disconnect).
- Performing an expensive on-demand computation quickly by parallelizing it
across many tasks in one or more Samza jobs, like in Storm's [distributed
RPC|https://storm.incubator.apache.org/documentation/Distributed-RPC.html].
These use cases can be implemented by running a network server (e.g. a HTTP
server, a WebSocket server, a Thrift RPC server, etc) in each Samza container.
We then need to solve two problems:
# How is a client request routed to the right container? (Assuming Samza is
running in a YARN cluster, we have no way of knowing the IP address and port
number of a container a priori. Also, the endpoint may change as containers
fail and are restarted.)
# How are requests from remote clients integrated with Samza's StreamTask
programming model? (Requests may arrive at any time; responses may be delivered
immediately or asynchronously at some later time; depending on the protocol,
there may be a stream of requests and responses on a single client connection.)
Point (1) is a service discovery problem, for which there are many possible
solutions, but no one obvious winner. [Helix|http://helix.apache.org/] could be
used for this (although Helix also provides various cluster management features
that we probably wouldn't need). YARN is considering integrating a service
discovery mechanism (YARN-913). [Finagle
ServerSets|http://stevenskelton.ca/finagle-serverset-clusters-using-zookeeper/]
and [Rest.li D2|https://github.com/linkedin/rest.li/wiki/Dynamic-Discovery]
both use Zookeeper for service discovery. Looking beyond the end of our
JVM-based nose, projects like [Consul|http://www.consul.io/],
[SkyDNS|http://blog.gopheracademy.com/skydns] and
[etcd|https://github.com/coreos/etcd] also provide service discovery. It would
be worth surveying the landscape and figuring out the various pros and cons
before settling on one particular service discovery mechanism. In particular,
we should keep in mind the needs of clients that are not written in a JVM-based
language.
Whatever service discovery solution is chosen, we will need to decide whether
to use a separate TCP port for each TaskInstance within a container, or whether
to use some application-level mechanism for deciding which TaskInstance should
process a particular incoming request.
For (2) I propose the following: a network service is implemented as just
another system (i.e. implementing SystemFactory) in Samza. Every incoming
request from a client is wrapped in an IncomingMessageEnvelope, and goes
through the same MessageChooser flow as everything else. This ensures that
StreamTask remains single-threaded and easy to reason about.
Each connection from a client is given a unique stream name. This allows the
StreamTask to tell which requests came from the same client. In order to send a
response to a client, the StreamTask sends an OutgoingMessageEnvelope to an
output stream, using the same system and stream name as the incoming request.
This means that a StreamTask can generate a response immediately if it wants
to, or it could send a response asynchronously at some later point in time. It
also works for protocols that can have many requests and responses on a
long-lived connection, e.g. WebSocket.
Special incoming messages can be used to indicate that a client has connected
or disconnected (allowing cleanup of any per-client information in the
StreamTask), and a special outgoing message can be used to tell the network
service to disconnect a particular client.
If a container is shut down, all of its clients will be disconnected and will
need to reconnect. I don't think we need to worry about preserving connections
across container restarts.
--
This message was sent by Atlassian JIRA
(v6.2#6252)