TLDR: how do you use Component when the application logic involves retrying failed components?
Background: I'm writing an app that consumes events from a streaming HTTP connection and writes those events to a message queue (see Code Illustration #1). It seems like that could be captured easily with three components —an HTTP stream, a message queue connection, and a "shoveler" that depends on the other two (see Code Illustration #2)— *but* the reconnection requirements complicate things… The HTTP connection may be closed at any time by the server; if that happens, the app should persistently attempt to reconnect using an exponential back-off pattern. In addition, if the app goes thirty seconds without receiving any data, it should close the connection and try to reconnect. (see Code Illustration #3) It's not clear to me how to best express these "retry" requirements in the component lifecycle. Like, is it blasphemous for a component to be calling stop and start on its injected dependencies? Some possible approaches: - Throw different kinds of exceptions to indicate what should happen (using namespaced keywords, perhaps?), handled by whoever calls component/start on the system-map. - The exception provides the component and system at the time of the exception, enabling a sort of "resume" capability. - I'm under the impression that relying on exceptions for control flow is an anti-pattern. - Create a sort of custom system implementation, one that goes beyond calling start on its components in dependency order to monitor failures and direct retries "appropriately". - "A system is a component which knows how to start and stop other components." (from the README) So the fact that we want the shoveler component to be capable of restarting the HTTP component indicates that the shoveler should actually be considered a system. (right?) - If the HTTP stream is *injected* into the shoveler as a dependency, how is it possible for the shoveler to stop the HTTP stream and then start it again *with* any dependencies the stream may have? - Ensure that every component/Lifecycle method implementation is idempotent, so that I can get good-enough "restart" semantics by just calling start-system again. - I know that idempotence is generally a Good Thing anyway, but using start-system as a panacea strikes me as crude. Code Illustrations: 1. Rough sketch of app without timeout/retry logic or component: (defn -main [] (let [mq-conn (connect-to-queue mq-config) {event-stream :body} (http/get endpoint {:as :stream})] (with-open [rdr (java.io/reader event-stream)] (doseq [entity (line-seq rdr)] (write mq-conn entity))))) 2. Rough sketch of app with component but still without timeout/retry logic: (defrecord EventStream [endpoint http-config stream] component/Lifecycle (start [this] (let [response (http/get endpoint (merge {:as :stream} http-config))] (assoc this :http-response response, :stream (:body response))) (stop [this] (.close stream) (-> this (dissoc :http-response) (assoc :stream nil)))) (defrecord MessageQueue [config connection] component/Lifecycle (start [this] (assoc this :connection (connect-to-queue config))) (stop [this] (.close connection) (assoc this :connection nil))) (defrecord Shoveler [source sink worker] component/Lifecycle (start [this] ;; To avoid blocking indefinitely, we put the processing in a future. (assoc this :worker (future (with-open [rdr (java.io/reader (:stream source)] (doseq [entity (line-seq rdr)] (write sink entity))))) (stop [this] (future-cancel worker) (assoc this :worker nil))) (defn -main [] (-> (component/system-map :config (read-config) :events (map->EventStream {:endpoint endpoint}) :mq-client (map->MessageQueue {}) :shoveler (map->Shoveler {})) (component/using {:events {:http-config :config} :mq-client {:config :config} :shoveler {:source :events :sink :mq-client}}) component/start)) 3. Rough sketch of desired *production* behavior, not using Component: (defn -main [] (let [mq-conn (connect-to-queue mq-config)] (while true ; ideally, the app is *always* ready to receive incoming events & put them into the queue (try (let [{event-stream :body} (loop [conn-timeout 1000] (try (http/get endpoint {:as :stream :conn-timeout conn-timeout :socket-timeout 30000}) (catch java.net. SocketTimeoutException e (if (> conn-timeout 32000) ; an upper limit. 32 seconds was arbitrarily chosen (throw (SomeAppropriateException. "Service unavailable. Human attention needed." e)) (recur (* 2 conn-timeout)) ; I know that you can't actually `recur` inside of a `catch`, but I think it's the clearest way to present this pseudocode ))))] (with-open [rdr (java.io/reader event-stream)] (doseq [entity (line-seq rdr)] (write mq-conn entity)))) (catch java.net.SocketTimeoutException e (log/warn e "Didn't receive any data for thirty seconds. Reconnecting.")) (catch java.net.SocketException e (log/warn e "Server closed the connection. Reconnecting.")) ;; Any other exceptions *will* escape the retry loop )))) -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.