Thanks for that Larry but I think this is a bit of overkill for my
scenario. The code I pasted is almost verbatim what we have in our
production codebase, so the ability to queue new jobs etc is really not
needed. Cheers though.
On Thursday, September 18, 2014 9:38:47 AM UTC+10, larry google groups
wrote:
>
>
> > We don't have streams of data here, the long running tasks have
> side-effects. I would
> > prefer to avoid adding another whole framework just to run a few long
> running jobs in p//.
>
>
> I guess I should show you some code, so you can see how simple this is.
> I'll copy-and-paste some code that I use.
>
> One simple way I use Lamina is to save stuff to a database. I don't want
> the "save" action happening in my main thread, so I put the data in a
> channel, and I let some workers pull that data off the channel and put it
> in the database. So what follows is the whole file, this about 30 lines of
> code, including some try/catch stuff that you probably don't need:
>
> (ns loupi.persistence-queue
> (:require
> [loupi.persistence :as persistence]
> [slingshot.slingshot :as ss]
> [lamina.core :as lamina]))
>
> (def ^:private persistence-channel (lamina/channel))
>
> (defn persist-this-item [context-wrapper-for-database-call]
> (lamina/enqueue persistence-channel
> (fn []
> (ss/try+
> (persistence/make-consistent
> context-wrapper-for-database-call)
> (catch Object o (ss/throw+ {:type
> :loupi.supervisor/problem
> :message "Error in
> persistence-queue/persist-this-itme."
> :data o}))))))
>
> (defn worker []
> (loop [closure-with-item-inside @(lamina/read-channel
> persistence-channel)]
> (ss/try+
> (closure-with-item-inside)
> (catch Object o (ss/throw+ {:type :loupi.supervisor/problem
> :message "Error in
> persistence-queue/worker."
> :closure closure-with-item-inside
> :data o})))
> (recur @(lamina/read-channel persistence-channel))))
>
> (defn start-workers []
> (dotimes [_ 6]
> (println "Starting up the persist queue workers.")
> (future (worker))))
>
>
>
> I call (start-workers) when the app starts. When I save something to the
> database, I call (persist-this-item) and I put a closure on the channel.
> The workers eventually grab that closure and execute it.
>
> Clearly, that closure can do whatever you like. To borrow from your
> original example, that closure is where you would put:
>
> (long-running-widget-processor widget)
>
>
>
>
>
>
>
>
> On Tuesday, September 16, 2014 10:00:07 PM UTC-4, Beau Fabry wrote:
>>
>> We don't have streams of data here, the long running tasks have
>> side-effects. I would prefer to avoid adding another whole framework just
>> to run a few long running jobs in p//.
>>
>> I have a list of jobs to do, I'm partitioning that list up into 4 sub
>> lists to be worked through by 4 p// workers, I then want to block and wait
>> until all 4 workers have finished their tasks.
>>
>> On Wednesday, September 17, 2014 3:27:07 AM UTC+10, larry google groups
>> wrote:
>>>
>>>
>>> This does not look correct to me. Perhaps someone else has more insight
>>> into this. I am suspicious about 2 things:
>>>
>>> 1.) your use of doall
>>>
>>> 2.) your use of (thread)
>>>
>>> It looks to me like you are trying to hack together a kind of pipeline
>>> or channel. Clojure has a wealth of libraries that can handle that for you.
>>> The main thing you are trying to do is this:
>>>
>>> (long-running-widget-processor widget))
>>>
>>>
>>> You go to some trouble to set up workers, all to ensure that
>>> long-running-widget-processor
>>> is handled in its own thread.
>>>
>>> I would suggest you look at Lamina:
>>>
>>> https://github.com/ztellman/lamina
>>>
>>> In particular, look at pipelines:
>>>
>>> https://github.com/ztellman/lamina/wiki/Pipelines
>>>
>>>
>>>
>>>
>>>
>>> On Friday, September 5, 2014 1:46:02 AM UTC-4, Beau Fabry wrote:
>>>>
>>>> Is the kinda ugly constant (doall usage a sign that I'm doing
>>>> something silly?
>>>>
>>>> (let [num-workers 4
>>>> widgets-per-worker (inc (int (/ (count widgets) num-workers)))
>>>> bucketed-widgets (partition-all widgets-per-worker widgets)
>>>> workers (doall (map (fn [widgets]
>>>> (thread
>>>> (doseq [widget widgets]
>>>> (long-running-widget-processor widget))
>>>> true))
>>>> bucketed-widgets))]
>>>> (doall (map <!! workers)))
>>>>
>>>> https://gist.github.com/bfabry/ad830b1888e4fc550f88
>>>>
>>>> All comments appreciated :-)
>>>>
>>>> Cheers,
>>>> Beau
>>>>
>>>
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to [email protected]
Note that posts from new members are moderated - please be patient with your
first post.
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.