I'm writing a an ETL process to read event level data from a product database, transform / aggregate it and write to to an analytics data warehouse. I'm using clojure's core.async library to separate these process into concurrently executing components. Here's what the main part of my code looks like right now
(ns data-staging.main (:require [clojure.core.async :as async]) (:use [clojure.core.match :only (match)] [data-staging.map-vecs] [data-staging.tables]) (:gen-class)) (def submissions (make-table "Submission" "Valid")) (def photos (make-table "Photo")) (def videos (make-table "Video")) (def votes (make-table "Votes")) ;; define channels used for sequential data processing (def chan-in (async/chan 100)) (def chan-out (async/chan 100)) (defn write-thread [table] "infinitely loops between reading subsequent 10000 rows from table and ouputting a vector of the rows(maps) into 'chan-in'" (while true (let [next-rows (get-rows table)] (async/>!! chan-in next-rows) (set-max table (:max-id (last next-rows)))))) (defn aggregator [] "takes output from 'chan-in' and aggregates it by coupon_id, date. then adds / drops any fields that are needed / not needed and inputs into 'chan-out'" (while true (->> (async/<!! chan-in) aggregate (async/>!! chan-out)))) (defn read-thread [] "reads data from chan out and interts into Analytics DB" (while true (upsert (async/<!! chan-out)))) (defn -main [] (async/thread (write-thread submissions)) (async/thread (write-thread photos)) (async/thread (write-thread videos)) (async/thread-call aggregator) (async/thread-call read-thread)) As you can see, I'm putting each os component on to its own thread and using the blocking >!! call on the channels. It feels like using the non-blocking >! calls along with go routines might be better for this use case, especially for the database reads which spend most of their time performing i/o and waiting for new rows in the product db. Is this the case, and if so, what would be the best way to implement it? I'm a little unclear on all the tradeoffs between the two methods and on exactly how to effectively use go routines. Also any other suggestions on how to improve the overall architecture would be much appreciated! -- -- You received this message because you are subscribed to the Google Groups "Clojure" group. To post to this group, send email to clojure@googlegroups.com Note that posts from new members are moderated - please be patient with your first post. To unsubscribe from this group, send email to clojure+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/clojure?hl=en --- You received this message because you are subscribed to the Google Groups "Clojure" group. To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/groups/opt_out.