I'm writing a an ETL process to read event level data from a product 
database, transform / aggregate it and write to to an analytics data 
warehouse. I'm using clojure's core.async library to separate these process 
into concurrently executing components. Here's what the main part of my 
code looks like right now

        (ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
  [data-staging.map-vecs]
  [data-staging.tables])
(:gen-class))

(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))

;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))

(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from 
     table and ouputting a vector of the rows(maps) 
 into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))

(defn aggregator []
    "takes output from 'chan-in' and aggregates it by coupon_id, date.
     then adds / drops any fields that are needed / not needed and inputs
     into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))

(defn read-thread []
"reads data from chan out and interts into Analytics DB" 
(while true 
(upsert (async/<!! chan-out))))

(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))

As you can see, I'm putting each os component on to its own thread and 
using the blocking >!! call on the channels. It feels like using the 
non-blocking >! calls along with go routines might be better for this use 
case, especially for the database reads which spend most of their time 
performing i/o and waiting for new rows in the product db. Is this the 
case, and if so, what would be the best way to implement it? I'm a little 
unclear on all the tradeoffs between the two methods and on exactly how to 
effectively use go routines. Also any other suggestions on how to improve 
the overall architecture would be much appreciated!

-- 
-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to