On Wed, Aug 13, 2014 at 8:40 PM, Evgeniy Ostapenko <sml...@gmail.com> wrote:

>
>
> среда, 13 августа 2014 г., 19:58:50 UTC+4 пользователь √ написал:
>
>>
>>
>>
>> On Wed, Aug 13, 2014 at 4:32 PM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>>
>>>> I'm sorry but I think the standard library disagrees with you:
>>>>
>>>> scala> val i = 0 to 9
>>>> i: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4,
>>>> 5, 6, 7, 8, 9)
>>>>
>>>> scala> val i2 = i filter (_ % 2 == 0)
>>>> i2: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 2, 4, 6, 8)
>>>>
>>>> scala> i zip i2 foreach println
>>>> (0,0)
>>>> (1,2)
>>>> (2,4)
>>>> (3,6)
>>>> (4,8)
>>>>
>>>
>>> Ok. But what do you think will be printed in my last "more complicated"
>>> example? Can you predict that (No)? Can you predict what will be printed in
>>> standard library (Yes)?
>>>
>>
>> Since you chose to use a non-deterministic combinator of course the
>> result is going to be non-deterministic. You got exactly what you ordered:
>> If you choose to call System.exit(1), the system will exit.
>>
>
> "Merge" is non-deterministic. But "zip" deterministic - problem here is
> when you have deterministic zip, but one or both of the streams is
> non-consistent (after merge currently).
> Even though "zip" is deterministic you get non-consistent result stream.
>

Yep, but that's reality:

scala> Iterator.continually(System.nanoTime) zip
Iterator.continually(System.nanoTime)
res7: Iterator[(Long, Long)] = non-empty iterator

scala> res7.take(10).toList
res8: List[(Long, Long)] = List((1408002429670460000,1408002429670487000),
(1408002429670502000,1408002429670503000),
(1408002429670505000,1408002429670506000),
(1408002429670508000,1408002429670509000),
(1408002429670511000,1408002429670512000),
(1408002429670514000,1408002429670514000),
(1408002429670517000,1408002429670517000),
(1408002429670519000,1408002429670520000),
(1408002429670522000,1408002429670523000),
(1408002429670525000,1408002429670526000))

One thing to keep in mind is that "Streams are not Collections".



>
> For example you have method which require two flow as parameters:
> def modifyFlow(flow1: Flow[Int], flow2: Flow[Int]): Flow[(Int, Int)] = {
>    flow1 zip flow2.toProducer
> }
>
> problem:
> How you can know about consistent result or not from inside this method?
> In standard library you can require Seq for "consistent" or Set for
> "inconsistent":)
>

You shouldn't care about that, the person handing you the flows is able to
provide you with any flow they desire to.
And if it doesn't behave as they want/expect, they are in full control over
passing in something else.


>
>
>>
>>> Why you reference to standard library? Akka-streams includes not only
>>> sequence abstraction, but also time abstraction. It is impossible and wrong
>>> to compare so different apis.
>>>
>>
>> Why so? You made a claim regarding "intuitiveness", and I made an example
>> where the same intuition as for the standard library holds, what's wrong
>> with that?
>>
>>
>>> For example what mean "merge" in standard library.
>>>
>>
>> There is none. So there is nothing to expect.
>>
>>
>
> Of course none. Because libraries have quite different use cases.
>

So what was your point?


>
>
>>  What about "concat" for two infinite streams in Akka-streams?
>>>
>>
>> What does the following do:
>>
>> scala> Iterator from 1
>> res3: Iterator[Int] = non-empty iterator
>>
>> scala> Iterator from 1
>> res4: Iterator[Int] = non-empty iterator
>>
>> scala> res3 ++ res4
>> res5: Iterator[Int] = non-empty iterator
>>
>> res5.<whatever>
>>
>>
>>> "fold" or "lastOption" in case infinite stream?
>>>
>>
>> For an infinite stream they will not terminate, this is exactly the same
>> as for an infinite Iterator as my example above.
>>
>
> The same, but no exactly. First, a stream have no deterministic first
> element (oh my god, why not as in standard library:).
>

This is also true for Iterators.


> Second: Akka infinite stream elements depends on time (want you this or
> not). In a standard library (excluding mutable package which produce
> concurrency problems) you have not such dependency.
>

No, it does not strictly depend on time.

Proof:

Flow(Iterator.continually(1))


>
> Imagine you works only with a mutable collections in high concurrent
> environment and without any locks. Yes, all methods "as in standard
> library", but what results you achieved?
> The same thing with infinite streams without consistency control..
>

The causal relationship between your 2 statements is unclear or omitted.

My point is: If you as a developer opt into non-determinism, that's up to
you. There's nothing preventing you from calling System.exit(1) if you so
desire to do so, who am I to say you can't?
I hope I have proven that you can choose to have fully deterministic (I
believe in your lingo: consistent) streams if you want to.


>
>
>>
>>
>>
>>>
>>>
>>>> Could you give an example of what is non-intuitive and what is
>>>> complicated (so we can try to address it)?
>>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Eventually all of this can be inside MaterializeSettings. However I
>>>>>>> think MaterializeSettings is the info about "as create stream with 
>>>>>>> akka".
>>>>>>> In difference StreamSettings is info only about the natural stream
>>>>>>> properties not linked to any library.
>>>>>>>
>>>>>>
>>>>>> You are of course free to implement your own version of Reactive
>>>>>> Streams and provide your own combinators and settings properties which
>>>>>> would, if following the spec, be completely interoperable with other
>>>>>> implementations.
>>>>>>
>>>>>
>>>>> Of course. That always true:)
>>>>> Reactive streams specification:
>>>>> ...
>>>>> Subscriber:
>>>>>
>>>>>    - must not accept an onSubscribe event if it already has an active
>>>>>    Subscription [7]
>>>>>
>>>>> TwoStreamInputProcessor
>>>>>
>>>>> So, Akka-streams exited out of the Reactive streams specifications in
>>>>> that area and cant anymore reference to it.
>>>>> Akka-streams need to extend specifications I think. Or remove join
>>>>> operations like zip, merge and other.
>>>>>
>>>>
>>>> There's nothing that says that an object X that has 2 Subscribers
>>>> internally where the first one is connected to Publisher A and the seconds
>>>> one to Publisher B, so I so no problem implementing zip, merge, concat or
>>>> other.
>>>>
>>>
>>> "object X" is "TwoStreamInputProcessor", but why it named "Processor"?:)
>>> I think because since you need to implement "Processor" with ActorRef as
>>> impl. But subscribers in such case cant be holds in Processor and holds
>>> inside "..Processor extends Actor".
>>> But after this somebody forgot that anyway impl must satisfy Processor
>>> requirements - this is implementation of .org.reactivestreams.api.Processor
>>> (not extends this but anyway).
>>> And what do you think are you get if call ActorProcessor:getSubscriber
>>> for Zip node? First subscriber or second? And how you can get second?
>>> TwoStreamInputProcessor doing something what exits out of ReactiveStreams
>>> interface and ideology.
>>> Akka team doing big and excellent work, but if you already break
>>> ideology of Reactive Streams - you can do this and later, who cares?:)
>>>
>>
>> Akka Streams is highly experimental, both in terms of implementation and
>> in terms of API.
>> We are going to be spec and TCK compliant when Reactive Streams has an
>> updated TCK (currently in process) and we're currently reworking the Akka
>> Streams implementation as we speak.
>> And, I have already presented an implementation that will preserve spec
>> compliance as well as support join operations,
>> so you can rest assured that Akka Streams are going to be RS compliant
>> and also non-experimental in the future.
>>
>
> Hmm, sorry may be I miss something. Where I can find your presentation?
>

trait Joiner[T, U, R] extends Processor[T, U, R] {
  def otherSubscriber: Subscriber[U]
}

trait Zipper[T, U] extends Joiner[T, U, (T, U)]

trait Merger[T] extends Joiner[T, T, T]

trait Concatenator[T] extends Joiner[T, T, T]


>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to