Clojure's async is built around the opinion that you, the programmer,
should be required to think about what sort of buffer you want to have on
your channel, and think about what should happen if that buffer overflows.

Your code spins off 5000 little go blocks that are each trying to write to
a channel of size 1.  This effectively forces Clojure to buffer up all
those blocking puts.  Clojure provides a fairly limited buffer (size 1024)
for the blocked puts so that you can't easily circumvent its opinionated
policy that *you* should be supplying the right size buffer with the
appropriate policy.

So, short answer: change your (chan) to something like (chan 10000) and
your code works fine.

But that's probably not what you're looking for.  We really don't want to
have to size the buffer based on the input to the pi function.  We want to
be able to deal with arbitrarily large inputs with some reasonably smallish
buffer.  Because this is a toy problem, it glosses over something that
would be critical for us to think about in a real application: Which runs
faster, the "term" computation or the additions in the accumulator?  The
answer to that question leads to two a couple different solutions.

Before I show some possible solutions, I should mention that I'm not a big
fan of the way you wrote the "term" function.  I would argue that it is
much cleaner to write it as a pure function (something you can test).
Don't inject notions of channels into a pure arithmetic computation.  Also,
you've made the math unnecessarily convoluted with your overuse of ->.
Embrace infix math.  If you think the -1^k expression is awkward, you could
do something like this:

(defn term [k]
  (/ (if (even? k) 4 -4)
     (inc (* 2 k))))

There, a nice clean pure function that makes it clear that -1^k is just a
"trick" for putting the correct sign on the expression.  You can change 2
to 2.0 if you want to return a double rather than a rational number from
this function (or use this version and just coerce to the double in the
consumer, depending on the behavior you want).

Or, if you want to use pow:
(defn term [k]
  (/ (* 4 (Math/pow -1 k))
     (inc (* 2 k))))

Or, you can leverage the fact that multiplying by 1 or -1 is the same as
dividing and eliminate the multiplication:
(defn term [k]
  (/ 4 (Math/pow -1 k)
     (inc (* 2 k))))

Any of these would be preferable to the -> version which is harder to read.

Now, back to the key question.

Case 1: term is fast, addition is slow
In this case, we really just need one thread to generate all the term
expressions, and we simply want to let the producer get ahead of the
consumer by some bounded amount.  So something like this would work:

(defn pi [n]
  (let [ch (to-chan (map term (range (inc n))))]
    (<!! (async/reduce + 0.0 ch))))

to-chan uses a bounded buffer of size 100, and we're also making use of the
way map works with chunked sequences to efficiently do fast operations.  If
you don't want chunked sequence behavior, you can use `sequence` with a
transducer:

(defn pi [n]
  (let [ch (to-chan (sequence (map term) (range (inc n))))]
    (<!! (async/reduce + 0.0 ch))))

Case 2: term is slow, addition is fast
The above code will still work if term is slower than addition, but if we
know term is slower, we can improve performance by running the (map term)
transducer on multiple processors.  async's pipeline lets you do just
that.  Let's say we want to use 20 "go blocks".  We'll put all the numbers
0 through n on one channel, pipe it to another channel with the (map term)
transducer using a parallelism count of 20, and reduce the result:

(defn pi [n]
  (let [ch (chan 100)]
    (pipeline 20 ch (map term) (to-chan (range (inc n))))
    (<!! (async/reduce + 0.0 ch))))

Of course, you can get computational parallelism without using channels at
all:
(defn pi [n]
  (clojure.core.reducers/fold + (clojure.core.reducers/map term (vec (range
(inc n))))))

which runs several times faster (depending on your number of processors)
than the naive:
(defn pi [n]
  (reduce + (map term (range (inc n)))))



Summary: core.async forces you to make some choices, because of its
philosophy that unlimited buffers results in error-prone programs that will
crash unexpectedly when resources are exceeded, but once you make those
choices, it gives you some nice high-level constructs to express these
ideas succinctly.

-- 
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clojure@googlegroups.com
Note that posts from new members are moderated - please be patient with your 
first post.
To unsubscribe from this group, send email to
clojure+unsubscr...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- 
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to clojure+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to