четверг, 14 августа 2014 г., 15:47:44 UTC+4 пользователь √ написал:
>
>
>
>
> On Thu, Aug 14, 2014 at 1:36 PM, Evgeniy Ostapenko <sml...@gmail.com 
> <javascript:>> wrote:
>
>>
>>
>> четверг, 14 августа 2014 г., 11:58:48 UTC+4 пользователь √ написал:
>>
>>>
>>>
>>>
>>> On Wed, Aug 13, 2014 at 8:40 PM, Evgeniy Ostapenko <sml...@gmail.com> 
>>> wrote:
>>>
>>>>
>>>>
>>>> среда, 13 августа 2014 г., 19:58:50 UTC+4 пользователь √ написал:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 4:32 PM, Evgeniy Ostapenko <sml...@gmail.com> 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>> I'm sorry but I think the standard library disagrees with you:
>>>>>>>
>>>>>>> scala> val i = 0 to 9
>>>>>>> i: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 
>>>>>>> 4, 5, 6, 7, 8, 9)
>>>>>>>
>>>>>>> scala> val i2 = i filter (_ % 2 == 0)
>>>>>>> i2: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 2, 4, 6, 
>>>>>>> 8)
>>>>>>>
>>>>>>> scala> i zip i2 foreach println
>>>>>>> (0,0)
>>>>>>> (1,2)
>>>>>>> (2,4)
>>>>>>> (3,6)
>>>>>>> (4,8) 
>>>>>>>
>>>>>>
>>>>>> Ok. But what do you think will be printed in my last "more 
>>>>>> complicated" example? Can you predict that (No)? Can you predict what 
>>>>>> will 
>>>>>> be printed in standard library (Yes)?
>>>>>>
>>>>>
>>>>> Since you chose to use a non-deterministic combinator of course the 
>>>>> result is going to be non-deterministic. You got exactly what you 
>>>>> ordered: 
>>>>> If you choose to call System.exit(1), the system will exit.
>>>>>
>>>>
>>>> "Merge" is non-deterministic. But "zip" deterministic - problem here is 
>>>> when you have deterministic zip, but one or both of the streams is 
>>>> non-consistent (after merge currently).
>>>> Even though "zip" is deterministic you get non-consistent result stream.
>>>>
>>>
>>> Yep, but that's reality:
>>>
>>> scala> Iterator.continually(System.nanoTime) zip 
>>> Iterator.continually(System.nanoTime)
>>> res7: Iterator[(Long, Long)] = non-empty iterator
>>>
>>> scala> res7.take(10).toList
>>> res8: List[(Long, Long)] = List((1408002429670460000,1408002429670487000), 
>>> (1408002429670502000,1408002429670503000), 
>>> (1408002429670505000,1408002429670506000), 
>>> (1408002429670508000,1408002429670509000), 
>>> (1408002429670511000,1408002429670512000), 
>>> (1408002429670514000,1408002429670514000), 
>>> (1408002429670517000,1408002429670517000), 
>>> (1408002429670519000,1408002429670520000), 
>>> (1408002429670522000,1408002429670523000), 
>>> (1408002429670525000,1408002429670526000))
>>>
>>>
>> Problem here - why need to you such result? That result is meaningless. 
>> In any situation you need consistent result. In standard library such 
>> result is exception, but in reactive streams it is a rule.
>>
>
> That's incorrect, when you choose `merge` you _choose_ non-determinism, 
> just as you choose non-determinism when you do in my example above. If you 
> don't want non-determinism—don't use merge!
>

consistent != deterministic. See below. 

>  
>
>>
>>  
>>
>>> One thing to keep in mind is that "Streams are not Collections".
>>>
>>
>> Of course. And "Infinite Streams are not Iterators and have no analogs in 
>> standard library":)
>>
>
> An Iterator can most definitely be an infinite source as I have previously 
> shown. Your statement to the contrary is not qualified with arguments.
>

Yes. Iterator can be infinite, but iterator works only when you need it 
(call next). Stream works always. 
For example: you cant get result 
from Iterator.continually(System.nanoTime).fold(0l)(_ + _). Standard 
library supposed that fold is always synchronous and you do not need such 
cases.
But for stream you can and often need to get result in such case. May be 
you want create snapshot for event-stream log. 

eventsFlow.fold(getObjectFromPreviosSnapshot)((object, event) => object 
appply event)

It is naturally that I want get snapshot (e.g. current value of object in 
runtime before eventsFlow was stopped).
 

>  
>
>>  
>>
>>>
>>>  
>>>
>>>>
>>>> For example you have method which require two flow as parameters:
>>>> def modifyFlow(flow1: Flow[Int], flow2: Flow[Int]): Flow[(Int, Int)] = {
>>>>    flow1 zip flow2.toProducer
>>>> }
>>>>
>>>> problem: 
>>>> How you can know about consistent result or not from inside this 
>>>> method? In standard library you can require Seq for "consistent" or Set 
>>>> for 
>>>> "inconsistent":)
>>>>
>>>
>>> You shouldn't care about that, the person handing you the flows is able 
>>> to provide you with any flow they desire to.
>>> And if it doesn't behave as they want/expect, they are in full control 
>>> over passing in something else.
>>>
>>
>> Are you prefer "null" instead Option.empty[T]?
>>
>
> It depends, sometimes `null` is the only reasonable option due to the 
> overhead of allocation, and for other use-cases Option is the most 
> appropriate.
> Option is an encoding of 0..1, trying to encode in the type system the 
> number of elements in streams is not possible in many cases (since the 
> elements downstream depends on the nature of elements upstream) and it 
> would definitely not be expressible in Java. What's your argument?
>

Option better then null of course. You works with strongly typed language - 
you need to use that advantages. 
 

>  
>
>>  
>>
>>>   
>>>
>>>>
>>>>   
>>>>>
>>>>>> Why you reference to standard library? Akka-streams includes not only 
>>>>>> sequence abstraction, but also time abstraction. It is impossible and 
>>>>>> wrong 
>>>>>> to compare so different apis. 
>>>>>>
>>>>>
>>>>> Why so? You made a claim regarding "intuitiveness", and I made an 
>>>>> example where the same intuition as for the standard library holds, 
>>>>> what's 
>>>>> wrong with that?
>>>>>  
>>>>>
>>>>>> For example what mean "merge" in standard library.
>>>>>>
>>>>>
>>>>> There is none. So there is nothing to expect.
>>>>>  
>>>>>
>>>>
>>>> Of course none. Because libraries have quite different use cases. 
>>>>
>>>
>>> So what was your point?
>>>
>>
> You also omitted replying here, so I have to assume you agree.
>  
>
>>  
>>>
>>>>  
>>>>
>>>>>  What about "concat" for two infinite streams in Akka-streams? 
>>>>>>
>>>>>
>>>>> What does the following do:
>>>>>
>>>>> scala> Iterator from 1
>>>>> res3: Iterator[Int] = non-empty iterator
>>>>>
>>>>> scala> Iterator from 1
>>>>> res4: Iterator[Int] = non-empty iterator
>>>>>
>>>>> scala> res3 ++ res4
>>>>> res5: Iterator[Int] = non-empty iterator
>>>>>
>>>>> res5.<whatever>
>>>>>  
>>>>>
>>>>>> "fold" or "lastOption" in case infinite stream?
>>>>>>
>>>>>
>>>>> For an infinite stream they will not terminate, this is exactly the 
>>>>> same as for an infinite Iterator as my example above.
>>>>>
>>>>
>>>> The same, but no exactly. First, a stream have no deterministic first 
>>>> element (oh my god, why not as in standard library:). 
>>>>
>>>
>>> This is also true for Iterators.
>>>
>>
> You also omitted replying here, so I have to assume you agree.
>

inifinite iterator is not "hot" or "live". You get next element only call 
next.
 

>  
>
>>   
>>>
>>>> Second: Akka infinite stream elements depends on time (want you this or 
>>>> not). In a standard library (excluding mutable package which produce 
>>>> concurrency problems) you have not such dependency.
>>>>
>>>
>>> No, it does not strictly depend on time.
>>>
>>> Proof:
>>>
>>> Flow(Iterator.continually(1))
>>>
>>
>> I saying about infinite stream ( stream that not have start element at 
>> all ). 
>>
> Such stream was already started when you connect to them. And first 
>> element ( eventually all elements) that you get depends on time when you 
>> will connected.
>>
>
> That's unrelated to infinity/unboundedness. The word you're looking for is 
> "live" or "hot" streams. Live or hot streams are most usually 
> bounded/finite.
>

Any network traffic infinite and unbounded, where you find finite here? 
For example I have infinite stream of events about user transactions.
That transaction ordered on start and must be ordered in end.
I need merge them after preparing (convert to one internal type) and merge 
in right order. 
Currently it hard. But with ConsistentFlow (currently i do not realize it 
in my library, but i will) for example it can be peace a cake.     
 

>  
>
>>  
>>
>>>  
>>>
>>>>
>>>> Imagine you works only with a mutable collections in high concurrent 
>>>> environment and without any locks. Yes, all methods "as in standard 
>>>> library", but what results you achieved?
>>>> The same thing with infinite streams without consistency control..
>>>>
>>>
>>> The causal relationship between your 2 statements is unclear or omitted.
>>>
>>> My point is: If you as a developer opt into non-determinism, that's up 
>>> to you. There's nothing preventing you from calling System.exit(1) if you 
>>> so desire to do so, who am I to say you can't?
>>> I hope I have proven that you can choose to have fully deterministic (I 
>>> believe in your lingo: consistent) streams if you want to.
>>>
>>
> You also omitted replying here, so I have to assume you agree.
>

One stream: S1 = 1, 2, 3
S2 = S1 _*2 == 0 // 2, 4, 6

S1 zip S2
deterministic and consistent result: (1, 2), (2, 4), (3, 6)
deterministic and non-consistent (suppose we are know that S1 emit (and 
drop from buffer) first element before S2 started): (1, 4), (2, 6)
non-deterministic and consistent: (2, 4), (3, 6), (1, 2)
non-deterministic and non-consistent: (1, 6), (3, 4), (2, 2)  

I can describe any difference and my opinion to you for a long time. But it 
is not so important.

I can do what I want, but I want do it easy:) I simply trying to improve 
library that most applicable to my goals. In my point of course.
 

>  
>
>>   
>>>
>>>>  
>>>>
>>>>>   
>>>>>  
>>>>>
>>>>>>
>>>>>>
>>>>>>> Could you give an example of what is non-intuitive and what is 
>>>>>>> complicated (so we can try to address it)?
>>>>>>>  
>>>>>>>
>>>>>>>>  
>>>>>>>>
>>>>>>>>>  
>>>>>>>>>
>>>>>>>>>>  
>>>>>>>>>> Eventually all of this can be inside MaterializeSettings. However 
>>>>>>>>>> I think MaterializeSettings is the info about "as create stream with 
>>>>>>>>>> akka". 
>>>>>>>>>> In difference StreamSettings is info only about the natural stream 
>>>>>>>>>> properties not linked to any library.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> You are of course free to implement your own version of Reactive 
>>>>>>>>> Streams and provide your own combinators and settings properties 
>>>>>>>>> which 
>>>>>>>>> would, if following the spec, be completely interoperable with other 
>>>>>>>>> implementations.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Of course. That always true:)
>>>>>>>> Reactive streams specification:
>>>>>>>> ...
>>>>>>>> Subscriber:
>>>>>>>>
>>>>>>>>    - must not accept an onSubscribe event if it already has an 
>>>>>>>>    active Subscription [7] 
>>>>>>>>
>>>>>>>> TwoStreamInputProcessor
>>>>>>>>
>>>>>>>> So, Akka-streams exited out of the Reactive streams specifications 
>>>>>>>> in that area and cant anymore reference to it.
>>>>>>>> Akka-streams need to extend specifications I think. Or remove join 
>>>>>>>> operations like zip, merge and other.
>>>>>>>>
>>>>>>>
>>>>>>> There's nothing that says that an object X that has 2 Subscribers 
>>>>>>> internally where the first one is connected to Publisher A and the 
>>>>>>> seconds 
>>>>>>> one to Publisher B, so I so no problem implementing zip, merge, concat 
>>>>>>> or 
>>>>>>> other.
>>>>>>>
>>>>>>
>>>>>> "object X" is "TwoStreamInputProcessor", but why it named 
>>>>>> "Processor"?:) I think because since you need to implement "Processor" 
>>>>>> with 
>>>>>> ActorRef as impl. But subscribers in such case cant be holds in 
>>>>>> Processor 
>>>>>> and holds inside "..Processor extends Actor".
>>>>>> But after this somebody forgot that anyway impl must satisfy 
>>>>>> Processor requirements - this is implementation of 
>>>>>> .org.reactivestreams.api.
>>>>>> Processor (not extends this but anyway).
>>>>>> And what do you think are you get if call 
>>>>>> ActorProcessor:getSubscriber for Zip node? First subscriber or second? 
>>>>>> And 
>>>>>> how you can get second? TwoStreamInputProcessor doing something what 
>>>>>> exits 
>>>>>> out of ReactiveStreams interface and ideology.
>>>>>> Akka team doing big and excellent work, but if you already break 
>>>>>> ideology of Reactive Streams - you can do this and later, who cares?:)
>>>>>>
>>>>>
>>>>> Akka Streams is highly experimental, both in terms of implementation 
>>>>> and in terms of API.
>>>>> We are going to be spec and TCK compliant when Reactive Streams has an 
>>>>> updated TCK (currently in process) and we're currently reworking the Akka 
>>>>> Streams implementation as we speak.
>>>>> And, I have already presented an implementation that will preserve 
>>>>> spec compliance as well as support join operations,
>>>>> so you can rest assured that Akka Streams are going to be RS compliant 
>>>>> and also non-experimental in the future.
>>>>>
>>>>
>>>> Hmm, sorry may be I miss something. Where I can find your presentation?
>>>>
>>>
>>> trait Joiner[T, U, R] extends Processor[T, U, R] {
>>>   def otherSubscriber: Subscriber[U]
>>> }
>>>
>>> trait Zipper[T, U] extends Joiner[T, U, (T, U)]
>>>
>>> trait Merger[T] extends Joiner[T, T, T]
>>>
>>> trait Concatenator[T] extends Joiner[T, T, T]
>>>
>>
>> Thank you. Updated TCK have a public link?
>>
>
> Watch the PRs for Reactive Streams.
>  
>
>>
>>
>>>
>>>>  -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to akka-user+...@googlegroups.com.
>>>> To post to this group, send email to akka...@googlegroups.com.
>>>>
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> -- 
>>> Cheers,
>>> √
>>>  
>>  -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>  

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to