[
https://issues.apache.org/jira/browse/SAMZA-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14057097#comment-14057097
]
Chris Riccomini commented on SAMZA-316:
---------------------------------------
bq. How is a client request routed to the right container? (Assuming Samza is
running in a YARN cluster, we have no way of knowing the IP address and port
number of a container a priori. Also, the endpoint may change as containers
fail and are restarted.)
This could be discovered by talking to the job's AM. The AM will know which
containers
bq. For (2) I propose the following: a network service is implemented as just
another system (i.e. implementing SystemFactory) in Samza. Every incoming
request from a client is wrapped in an IncomingMessageEnvelope, and goes
through the same MessageChooser flow as everything else. This ensures that
StreamTask remains single-threaded and easy to reason about.
Interesting. This is really creative. I like that it isolates this feature
outside of samza-core, so people that don't use it don't end up pulling in a
ton of Jetty dependencies (or whatever). I have been really reluctant to add an
HTTP server to SamzaContainer or TaskInstances because I think it clutters
everything up. This is a really nice way to get the functionality without
cluttering anything. In fact, I think no changes need to be made to Samza,
right?
bq. If a container is shut down, all of its clients will be disconnected and
will need to reconnect. I don't think we need to worry about preserving
connections across container restarts.
Agree.
bq. Point (1) is a service discovery problem
Agree. Regardless of which solution is picked, it can, again, be isolated from
the rest of Samza by having the SystemFactory's consumer register. I've been
reluctant to add a direct ZK dependency to SamzaContainer/TaskInstance, and
have thus far avoided it (we have indirect dependencies through Kafka, when
it's used).
I have shied away from this feature because I couldn't figure out a way to do
it that wasn't intrusive to the code base. Your proposal seems like a really
clean way to isolate this feature in its own package and module away from
everything else.
> Allow jobs to expose network service endpoints
> ----------------------------------------------
>
> Key: SAMZA-316
> URL: https://issues.apache.org/jira/browse/SAMZA-316
> Project: Samza
> Issue Type: New Feature
> Components: container
> Affects Versions: 0.7.0
> Reporter: Martin Kleppmann
>
> At the moment, the only way of contacting a task instance in a Samza job is
> to send a message to one of the job's input streams. Sometimes it would be
> useful to be able to contact a task instance directly, as a TCP network
> service. Example use cases:
> - Allowing a remote client to query a Samza job's state store (which may, in
> some cases, obviate the need for writing job output into a separate, publicly
> readable database).
> - Allowing a client to "tap into" a stream without consuming the entire
> stream. For example, a client may wish to be notified about all log messages
> matching a particular regular expression as long as they are connected (the
> notifications stop when they disconnect).
> - Performing an expensive on-demand computation quickly by parallelizing it
> across many tasks in one or more Samza jobs, like in Storm's [distributed
> RPC|https://storm.incubator.apache.org/documentation/Distributed-RPC.html].
> These use cases can be implemented by running a network server (e.g. a HTTP
> server, a WebSocket server, a Thrift RPC server, etc) in each Samza
> container. We then need to solve two problems:
> # How is a client request routed to the right container? (Assuming Samza is
> running in a YARN cluster, we have no way of knowing the IP address and port
> number of a container a priori. Also, the endpoint may change as containers
> fail and are restarted.)
> # How are requests from remote clients integrated with Samza's StreamTask
> programming model? (Requests may arrive at any time; responses may be
> delivered immediately or asynchronously at some later time; depending on the
> protocol, there may be a stream of requests and responses on a single client
> connection.)
> Point (1) is a service discovery problem, for which there are many possible
> solutions, but no one obvious winner. [Helix|http://helix.apache.org/] could
> be used for this (although Helix also provides various cluster management
> features that we probably wouldn't need). YARN is considering integrating a
> service discovery mechanism (YARN-913). [Finagle
> ServerSets|http://stevenskelton.ca/finagle-serverset-clusters-using-zookeeper/]
> and [Rest.li D2|https://github.com/linkedin/rest.li/wiki/Dynamic-Discovery]
> both use Zookeeper for service discovery. Looking beyond the end of our
> JVM-based nose, projects like [Consul|http://www.consul.io/],
> [SkyDNS|http://blog.gopheracademy.com/skydns] and
> [etcd|https://github.com/coreos/etcd] also provide service discovery. It
> would be worth surveying the landscape and figuring out the various pros and
> cons before settling on one particular service discovery mechanism. In
> particular, we should keep in mind the needs of clients that are not written
> in a JVM-based language.
> Whatever service discovery solution is chosen, we will need to decide whether
> to use a separate TCP port for each TaskInstance within a container, or
> whether to use some application-level mechanism for deciding which
> TaskInstance should process a particular incoming request.
> For (2) I propose the following: a network service is implemented as just
> another system (i.e. implementing SystemFactory) in Samza. Every incoming
> request from a client is wrapped in an IncomingMessageEnvelope, and goes
> through the same MessageChooser flow as everything else. This ensures that
> StreamTask remains single-threaded and easy to reason about.
> Each connection from a client is given a unique stream name. This allows the
> StreamTask to tell which requests came from the same client. In order to send
> a response to a client, the StreamTask sends an OutgoingMessageEnvelope to an
> output stream, using the same system and stream name as the incoming request.
> This means that a StreamTask can generate a response immediately if it wants
> to, or it could send a response asynchronously at some later point in time.
> It also works for protocols that can have many requests and responses on a
> long-lived connection, e.g. WebSocket.
> Special incoming messages can be used to indicate that a client has connected
> or disconnected (allowing cleanup of any per-client information in the
> StreamTask), and a special outgoing message can be used to tell the network
> service to disconnect a particular client.
> If a container is shut down, all of its clients will be disconnected and will
> need to reconnect. I don't think we need to worry about preserving
> connections across container restarts.
--
This message was sent by Atlassian JIRA
(v6.2#6252)