Rich Hickey a écrit :
> Again, I don't see the enormous side effect. Steams form a safe,  
> stateful pipeline, you'll generally only call seq on the end of the  
> pipe. If you ask for a seq on a stream you are asking for a (lazy)  
> reification. That reification and ownership is what makes the pipeline  
> safe.
>
> I am working on seq/stream api unification right now,  and we will see  
> how often we'll be calling seq fns yet subsequently using as a stream.  
> Many of those places where seq is called will now call stream instead  
> (e.g. sequence fn entry points), and there may be a non-generator- 
> capturing function for determining eos.
I undesrtand but I found this behaviour surprising :
user=> (defn take1 [s]
  (let [i (stream-iter s)
        n (next! i nil)]
    (detach! i)
    n))

(defn touch [s] (seq s) s)

(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (touch s1)
#<AStream clojure.lang.astr...@19ee8a>
user=> (take1 s1)
3
user=> (take1 s1)
3
user=> (take1 s1)
3
; s1 is stuck on 3 because stream-iter returns a new iter on a new 
stream on the canonical seq for s1


With the attached patch, you get:
user=> (defn take1 [s]
  (let [i (stream-iter s)
        n (next! i nil)]
    (detach! i)
    n))

(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (seq s1)
(3 4 5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (take1 s1)
3
user=> (take1 s1)
4
user=> (first s1)
5
;; seq lookup or realization don't consume the stream:
user=> (seq s1)
(5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (first s1)
5
user=> (take1 s1)
5
user=> (take1 s1)
6

I relaxed the constraint saying that "a stream ensures that /*every call 
to seq on a stream will return the same seq" to be */"a stream ensures 
that /*every call to seq on a stream will return the same seq as long as 
the stream state doesn't change".*/
/*What did I lose?

Christophe
*/

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Clojure" group.
To post to this group, send email to clojure@googlegroups.com
To unsubscribe from this group, send email to 
clojure+unsubscr...@googlegroups.com
For more options, visit this group at 
http://groups.google.com/group/clojure?hl=en
-~----------~----~----~----~------~----~------~--~---

Index: C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/AStream.java
===================================================================
--- C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/AStream.java	(revision 1222)
+++ C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/AStream.java	(working copy)
@@ -16,28 +16,44 @@
 
     static final ISeq NO_SEQ = new Cons(null, null);
 
-    ISeq seq = NO_SEQ;
+    boolean coseq;
+    AStream seqstream = null;
     final IFn src;
     Cons pushed = null;
     Iter iter = null;
 
     public AStream(IFn src) {
+        this(src, false);
+    }
+
+    public AStream(IFn src, boolean coseq) {
         this.src = src;
+        this.coseq = coseq;
     }
 
     final synchronized public ISeq seq() {
-        if (seq == NO_SEQ)
+    	if (coseq)
+			try {
+				return (ISeq) src.invoke();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+    	if (seqstream == null)
             {
             iter();
-            seq = Seq.create(pushed,src);
+            try {
+				seqstream = RT.stream(Seq.create(pushed,src));
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
             }
-        return seq;
+        return seqstream.seq();
     }
 
     final synchronized public AStream stream() throws Exception {
-        if (seq == NO_SEQ)
+        if (seqstream == null)
             return this;
-        return RT.stream(seq);
+        return seqstream;
     }
 
     final synchronized public Iter iter() {
Index: C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/RT.java
===================================================================
--- C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/RT.java	(revision 1222)
+++ C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/RT.java	(working copy)
@@ -244,6 +244,10 @@
 }
 
 static final public IFn EMPTY_GEN = new AFn(){
+    synchronized public Object invoke() throws Exception {
+        return null;
+    }
+	
     synchronized public Object invoke(Object eos) throws Exception {
         return eos;
     }
@@ -461,7 +465,7 @@
 
 static public AStream stream(final Object coll) throws Exception{
 	if(coll == null)
-		return new AStream(EMPTY_GEN);
+		return new AStream(EMPTY_GEN, true);
     else if(coll instanceof Streamable)
         return ((Streamable) coll).stream();
     else if(coll instanceof Fn)
Index: C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/ASeq.java
===================================================================
--- C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/ASeq.java	(revision 1222)
+++ C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/ASeq.java	(working copy)
@@ -177,7 +177,7 @@
 }
 
 public AStream stream() throws Exception {
-    return new AStream(new Src(this));
+    return new AStream(new Src(this), true);
 }
 
 static class Src extends AFn{
@@ -187,6 +187,10 @@
         this.s = s;
     }
 
+	public Object invoke() throws Exception {
+		return s;
+	}
+    
 	public Object invoke(Object eos) throws Exception {
         if(s != null)
             {

Reply via email to