[
https://issues.apache.org/jira/browse/SAMZA-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14049328#comment-14049328
]
Yan Fang commented on SAMZA-316:
--------------------------------
{quote}
For (2) I propose the following: a network service is implemented as just
another system (i.e. implementing SystemFactory) in Samza. Every incoming
request from a client is wrapped in an IncomingMessageEnvelope, and goes
through the same MessageChooser flow as everything else. This ensures that
StreamTask remains single-threaded and easy to reason about.
......
Special incoming messages can be used to indicate that a client has connected
or disconnected (allowing cleanup of any per-client information in the
StreamTask), and a special outgoing message can be used to tell the network
service to disconnect a particular client.
{quote}
Another goodness of this approach is that, with almost similar mechanism, we
can show the status of a stream by sending metrics information/metadata in a
special outgoing stream. The only concern is if this will influence the
performance when the request(incoming messages) and response (outgoing
messages) are very frequent.
> Allow jobs to expose network service endpoints
> ----------------------------------------------
>
> Key: SAMZA-316
> URL: https://issues.apache.org/jira/browse/SAMZA-316
> Project: Samza
> Issue Type: New Feature
> Components: container
> Affects Versions: 0.7.0
> Reporter: Martin Kleppmann
>
> At the moment, the only way of contacting a task instance in a Samza job is
> to send a message to one of the job's input streams. Sometimes it would be
> useful to be able to contact a task instance directly, as a TCP network
> service. Example use cases:
> - Allowing a remote client to query a Samza job's state store (which may, in
> some cases, obviate the need for writing job output into a separate, publicly
> readable database).
> - Allowing a client to "tap into" a stream without consuming the entire
> stream. For example, a client may wish to be notified about all log messages
> matching a particular regular expression as long as they are connected (the
> notifications stop when they disconnect).
> - Performing an expensive on-demand computation quickly by parallelizing it
> across many tasks in one or more Samza jobs, like in Storm's [distributed
> RPC|https://storm.incubator.apache.org/documentation/Distributed-RPC.html].
> These use cases can be implemented by running a network server (e.g. a HTTP
> server, a WebSocket server, a Thrift RPC server, etc) in each Samza
> container. We then need to solve two problems:
> # How is a client request routed to the right container? (Assuming Samza is
> running in a YARN cluster, we have no way of knowing the IP address and port
> number of a container a priori. Also, the endpoint may change as containers
> fail and are restarted.)
> # How are requests from remote clients integrated with Samza's StreamTask
> programming model? (Requests may arrive at any time; responses may be
> delivered immediately or asynchronously at some later time; depending on the
> protocol, there may be a stream of requests and responses on a single client
> connection.)
> Point (1) is a service discovery problem, for which there are many possible
> solutions, but no one obvious winner. [Helix|http://helix.apache.org/] could
> be used for this (although Helix also provides various cluster management
> features that we probably wouldn't need). YARN is considering integrating a
> service discovery mechanism (YARN-913). [Finagle
> ServerSets|http://stevenskelton.ca/finagle-serverset-clusters-using-zookeeper/]
> and [Rest.li D2|https://github.com/linkedin/rest.li/wiki/Dynamic-Discovery]
> both use Zookeeper for service discovery. Looking beyond the end of our
> JVM-based nose, projects like [Consul|http://www.consul.io/],
> [SkyDNS|http://blog.gopheracademy.com/skydns] and
> [etcd|https://github.com/coreos/etcd] also provide service discovery. It
> would be worth surveying the landscape and figuring out the various pros and
> cons before settling on one particular service discovery mechanism. In
> particular, we should keep in mind the needs of clients that are not written
> in a JVM-based language.
> Whatever service discovery solution is chosen, we will need to decide whether
> to use a separate TCP port for each TaskInstance within a container, or
> whether to use some application-level mechanism for deciding which
> TaskInstance should process a particular incoming request.
> For (2) I propose the following: a network service is implemented as just
> another system (i.e. implementing SystemFactory) in Samza. Every incoming
> request from a client is wrapped in an IncomingMessageEnvelope, and goes
> through the same MessageChooser flow as everything else. This ensures that
> StreamTask remains single-threaded and easy to reason about.
> Each connection from a client is given a unique stream name. This allows the
> StreamTask to tell which requests came from the same client. In order to send
> a response to a client, the StreamTask sends an OutgoingMessageEnvelope to an
> output stream, using the same system and stream name as the incoming request.
> This means that a StreamTask can generate a response immediately if it wants
> to, or it could send a response asynchronously at some later point in time.
> It also works for protocols that can have many requests and responses on a
> long-lived connection, e.g. WebSocket.
> Special incoming messages can be used to indicate that a client has connected
> or disconnected (allowing cleanup of any per-client information in the
> StreamTask), and a special outgoing message can be used to tell the network
> service to disconnect a particular client.
> If a container is shut down, all of its clients will be disconnected and will
> need to reconnect. I don't think we need to worry about preserving
> connections across container restarts.
--
This message was sent by Atlassian JIRA
(v6.2#6252)