On Fri, Aug 15, 2014 at 11:50 AM, Evgeniy Ostapenko <sml...@gmail.com>
wrote:

>
>
> пятница, 15 августа 2014 г., 12:46:20 UTC+4 пользователь √ написал:
>
>>
>>
>>
>> On Fri, Aug 15, 2014 at 12:15 AM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>
>>
>> четверг, 14 августа 2014 г., 21:03:57 UTC+4 пользователь √ написал:
>>
>>
>>
>>
>> On Thu, Aug 14, 2014 at 5:53 PM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>
>>
>> четверг, 14 августа 2014 г., 18:59:51 UTC+4 пользователь √ написал:
>>
>>
>>
>>
>> On Thu, Aug 14, 2014 at 2:45 PM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>
>>
>> четверг, 14 августа 2014 г., 15:47:44 UTC+4 пользователь √ написал:
>>
>>
>>
>>
>> On Thu, Aug 14, 2014 at 1:36 PM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>
>>
>> четверг, 14 августа 2014 г., 11:58:48 UTC+4 пользователь √ написал:
>>
>>
>>
>>
>> On Wed, Aug 13, 2014 at 8:40 PM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>
>>
>> среда, 13 августа 2014 г., 19:58:50 UTC+4 пользователь √ написал:
>>
>>
>>
>>
>> On Wed, Aug 13, 2014 at 4:32 PM, Evgeniy Ostapenko <sml...@gmail.com>
>> wrote:
>>
>>
>> I'm sorry but I think the standard library disagrees with you:
>>
>> scala> val i = 0 to 9
>> i: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5,
>> 6, 7, 8, 9)
>>
>> scala> val i2 = i filter (_ % 2 == 0)
>> i2: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 2, 4, 6, 8)
>>
>> scala> i zip i2 foreach println
>> (0,0)
>> (1,2)
>> (2,4)
>> (3,6)
>> (4,8)
>>
>>
>> Ok. But what do you think will be printed in my last "more complicated"
>> example? Can you predict that (No)? Can you predict what will be printed in
>> standard library (Yes)?
>>
>>
>> Since you chose to use a non-deterministic combinator of course the
>> result is going to be non-deterministic. You got exactly what you ordered:
>> If you choose to call System.exit(1), the system will exit.
>>
>>
>> "Merge" is non-deterministic. But "zip" deterministic - problem here is
>> when you have deterministic zip, but one or both of the streams is
>> non-consistent (after merge currently).
>> Even though "zip" is deterministic you get non-consistent result stream.
>>
>>
>> Yep, but that's reality:
>>
>> scala> Iterator.continually(System.nanoTime) zip
>> Iterator.continually(System.nanoTime)
>> res7: Iterator[(Long, Long)] = non-empty iterator
>>
>> scala> res7.take(10).toList
>> res8: List[(Long, Long)] = List((1408002429670460000,1408002429670487000),
>> (1408002429670502000,1408002429670503000), 
>> (1408002429670505000,1408002429670506000),
>> (1408002429670508000,1408002429670509000), 
>> (1408002429670511000,1408002429670512000),
>> (1408002429670514000,1408002429670514000), 
>> (1408002429670517000,1408002429670517000),
>> (1408002429670519000,1408002429670520000), 
>> (1408002429670522000,1408002429670523000),
>> (1408002429670525000,1408002429670526000))
>>
>>
>> Problem here - why need to you such result? That result is meaningless.
>> In any situation you need consistent result. In standard library such
>> result is exception, but in reactive streams it is a rule.
>>
>>
>> That's incorrect, when you choose `merge` you _choose_ non-determinism,
>> just as you choose non-determinism when you do in my example above. If you
>> don't want non-determinism—don't use merge!
>>
>>
>> consistent != deterministic. See below.
>>
>>
>>
>>
>>
>>
>>  One thing to keep in mind is that "Streams are not Collections".
>>
>>
>> Of course. And "Infinite Streams are not Iterators and have no analogs in
>> standard library":)
>>
>>
>> An Iterator can most definitely be an infinite source as I have
>> previously shown. Your statement to the contrary is not qualified with
>> arguments.
>>
>>
>> Yes. Iterator can be infinite, but iterator works only when you need it
>> (call next).
>>
>>
>> ? What if you have an Iterator to a collection that is being mutated by
>> another thread?
>>
>>
>> Yes! It will be concurrency problem and you need have tool for
>> concurrency control and eventually saving consistency. In standard you have
>> locks, synchronized, STM.. Where such tool in Akka-streams? I understand,
>> that "it under work":).
>>
>>
>> Doesn't matter for my example :) I just showed you have the same problem
>> with that kind of Iterators :)
>>
>>
>>
>>
>>
>>
>> Stream works always.
>> For example: you cant get result from 
>> Iterator.continually(System.nanoTime).fold(0l)(_
>> + _). Standard library supposed that fold is always synchronous and you do
>> not need such cases.
>> But for stream you can and often need to get result in such case. May be
>> you want create snapshot for event-stream log.
>>
>>
>> Ah, do I understand you right in that you mean that there's no way of
>> knowing whether you need to guard the operations that might not make sense
>> on unbounded inputs?
>>
>>
>> Yes! Currently user need to works with such cases "by hands" with a lot
>> of parasite code.
>>
>>
>> Alright, now we're getting somewhere! So you'd like to be able to discern
>> bounded from unbounded streams and hot and cold ones? What's your proposal
>> there?
>>
>> 1) Split Publisher and Subscriber on two classes each.
>> ConsistentPublisher/Publisher and ConsistentSubscriber/Subscriber.
>>
>>
>> And they don't inherit/extend eachother? (meaning that all end-user APIs
>> will be bifurcated?
>>
> They can be share part of api like Duct and Flow extends Builder.
> Eventually bifurcated use cases - bifurcated apis.
>

Everyone has different use-cases, that doesn't mean that everyone should
have their own APIs.


>
>
>>
>>
>> That need because for consistency you need wrap each element of stream in
>> object with version info (you can see this in my library)
>>
>>
>> Version info?
>>
> To be sure that you operate with elements of one version. I mean you have
> two elements which was taken by transformation of only one source element.
>

It sounds like you are doing something Akka Streams (or Reactive Streams)
have never intended to solve. Perhaps easier for our to tailor something
for your use-case.


>
>
>>
>>
>> If you need consistency more then performance then you need Consistent*
>> version. If you do not need join operations you can choose performance
>> version.
>>
>>
>> What if all you get is a non-consistent source? You can't make that
>> consistent no matter what you do.
>>
> Yes. However if user know how it can be doing he must have ability to set
> this. For example `asConsistent`.
>

That doesn't make sense, either you _can_ turn non-consistent sources into
consistent ones, and what exists today is not a problem, or you can't, and
your proposal will not work.


>
>>
>> 2) Add `preserveOrder` flag for Consistent streams. If
>> `preserveOrder`==false then you do not need preserve order in `mapFuture`,
>> but need in `merge`
>>
>>
>> Why would you need a flag, since reordering changes the observable
>> semantics of the stream, shouldn't it be separate API so one cannot just
>> flip a boolean flag but one needs to design for reordering?
>>
> Only user define semantic of the stream and if user set
> `preserveOrder`=false then when reordering happens it will not change
> stream semantic. In contrast if `preserveOrder`=true order must will be
> preserve until user do not apply something like `asNonConsistent`.
>

Of course it will change stream semantics. What if "Died" comes before
"Born"?


>
>
>>
>> 3) may be add `sortBy` operation
>>
>>
>> To what, and how? How does one sort an unbounded stream?
>>
> With time window OverflowStrategy may be. It is only for got ordered
> stream in user defined time window or may be other bounds.
> How you sort unbounded stream inside mapFuture?
>

The naïve solution would just refrain from asking for more elements until
the current Future completes. No sorting needed.


>
>
>>
>>
>> 4) change fold to emitting his result on each upstream event and returns
>> Flow[R] (without materialization).
>>
>>
>> There'll be a version that emits all intermittent values, it's called
>> "scan" in the standard library:
>>
>> // Unbounded iterator with a fold-like operation that emits intermittent
>> values:
>>
>> scala> Iterator.from(1).scanLeft(0)({ _ + _ }).take(10).toList
>> res2: List[Int] = List(0, 1, 3, 6, 10, 15, 21, 28, 36, 45)
>>
> Excellent:) Add scan to streams.
>

We will!


>
>>
>> 5) add exceptionHandler inside MaterializeSettings. It can be useful for
>> unbounded streams.
>>
>>
>> We're looking into this, for instance providing a supervision strategy
>> for the ActorBasedFlowMaterializer.
>>
> Thank you!
>
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> eventsFlow.fold(getObjectFromPreviosSnapshot)((object, event) => object
>> appply event)
>>
>> It is naturally that I want get snapshot (e.g. current value of object in
>> runtime before eventsFlow was stopped).
>>
>>
>> Sure.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> For example you have method which require two flow as parameters:
>> def modifyFlow(flow1: Flow[Int], flow2: Flow[Int]): Flow[(Int, Int)] = {
>>    flow1 zip flow2.toProducer
>> }
>>
>> problem:
>> How you can know about consistent result or not from inside this method?
>> In standard library you can require Seq for "consistent" or Set for
>> "inconsistent":)
>>
>>
>> You shouldn't care about that, the person handing you the flows is able
>> to provide you with any flow they desire to.
>> And if it doesn't behave as they want/expect, they are in full control
>> over passing in something else.
>>
>>
>> Are you prefer "null" instead Option.empty[T]?
>>
>>
>> It depends, sometimes `null` is the only reasonable option due to the
>> overhead of allocation, and for other use-cases Option is the most
>> appropriate.
>> Option is an encoding of 0..1, trying to encode in the type system the
>> number of elements in streams is not possible in many cases (since the
>> elements downstream depends on the nature of elements upstream) and it
>> would definitely not be expressible in Java. What's your argument?
>>
>>
>> Option better then null of course. You works with strongly typed language
>> - you need to use that advantages.
>>
>>
>> Yes, but if you cannot achieve good enough performance it doesn't matter
>> how many types you have at your disposal.
>>
>>
>> Ok. However performance not always most important thing. If something
>> does not work, performance is not important.
>> For example other topic from Akka user list https://groups.google.com
>> /forum/?hl=ru#!topic/akka-user/UNQeN-F6SRQ
>>
>>
>>
>>
>> I want say why user must solve problem with a lot of custom code that
>> breaks Reactive Streams specifications?
>>
>>
>> As you see in that thread, what the user asks for is in the works :)
>>
>>
>>
>>  I think because specifications under work and is not good enough.
>>
>>
>> ? Got an example?
>>
>> Join operations breaks specification. May be in future not, but now yes.
>>
>>
>> No, join operations do not break the specification—the current,
>> work-in-progress implementation in Akka Streams does, but it is being
>> rewritten as we're currently getting the TCK up to date with 0.4 of the
>> spec.
>>
>> TL;DR: The spec works just fine for join and split operations.
>>
> Ok. This branch is closed:)
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Why you reference to standard library? Akka-streams includes not only
>> sequence abstraction, but also time abstraction. It is impossible and wrong
>> to compare so different apis.
>>
>>
>> Why so? You made a claim regarding "intuitiveness", and I made an example
>> where the same intuition as for the standard library holds, what's wrong
>> with that?
>>
>>
>> For example what mean "merge" in standard library.
>>
>>
>> There is none. So there is nothing to expect.
>>
>>
>>
>> Of course none. Because libraries have quite different use cases.
>>
>>
>> So what was your point?
>>
>>
>> You also omitted replying here, so I have to assume you agree.
>>
>>
>>
>>
>>
>>
>>  What about "concat" for two infinite streams in Akka-streams?
>>
>>
>> What does the following do:
>>
>> scala> Iterator from 1
>> res3: Iterator[Int] = non-empty iterator
>>
>> scala> Iterator from 1
>> res4: Iterator[Int] = non-empty iterator
>>
>> scala> res3 ++ res4
>> res5: Iterator[Int] = non-empty iterator
>>
>> res5.<whatever>
>>
>>
>> "fold" or "lastOption" in case infinite stream?
>>
>>
>> For an infinite stream they will not terminate, this is exactly the same
>> as for an infinite Iterator as my example above.
>>
>>
>> The same, but no exactly. First, a stream have no deterministic first
>> element (oh my god, why not as in standard library:).
>>
>>
>> This is also true for Iterators.
>>
>>
>> You also omitted replying here, so I have to assume you agree.
>>
>>
>> inifinite iterator is not "hot" or "live". You get next element only call
>> next.
>>
>>
>> Not true for Iterators backed by concurrently modified collections.
>>
>>
>>
>>
>>
>>
>>
>>
>> Second: Akka infinite stream elements depends on time (want you this or
>> not). In a standard library (excluding mutable package which produce
>> concurrency problems) you have not such dependency.
>>
>>
>> No, it does not strictly depend on time.
>>
>> Proof:
>>
>> Flow(Iterator.continually(1))
>>
>>
>> I saying about infinite stream ( stream that not have start element at
>> all ).
>>
>> Such stream was already started when you connect to them. And first
>> element ( eventually all elements) that you get depends on time when you
>> will connected.
>>
>>
>> That's unrelated to infinity/unboundedness. The word you're looking for
>> is "live" or "hot" streams. Live or hot streams are most usually
>> bounded/finite.
>>
>>
>> Any network traffic infinite and unbounded, where you find finite here?
>>
>>
>> It's up to the protocol to define this. Datagrams are bounded for
>> instance.
>>
>>
>> For example I have infinite stream of events about user transactions.
>>
>>
>> Lets use "unbounded" iso "infinite", infinities are much more than just
>> unboundedness, see negative ininities etc.
>>
>> Ok. It make sense.
>>
>>
>>
>>
>> That transaction ordered on start and must be ordered in end.
>> I need merge them
>>
>>
>> what is "them" here? The transactions or the streams? (I see only one
>> stream mentioned)
>>
>> Ah, sorry. After I receive raw messages I need process some kind of
>> messages in other thread (service) to append additional data to them - so
>> eventually I get a few streams of messages of one internal type. And I need
>> to merge that messages in right order.
>>
>>
>> `map` and `mapFuture` preserves order.
>>
>> I need split/splice stream in a few other and push them in different
>> methods - then i need merge results. Merge doing that in random order. I do
>> not need in map or mapFuture.
>>
>>
>> Merge is random (non-deterministic) already. Sounds like it would work
>> just fine?
>>
> I need merge in deterministic way. Sorry me for my english:( I sense that
> you do not understand me sometimes.
>

Merge deterministically == concat or round-robin


>
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> after preparing (convert to one internal type) and merge in right order.
>> Currently it hard. But with ConsistentFlow (currently i do not realize it
>> in my library, but i will) for example it can be peace a cake.
>>
>>
>> Perhaps it's easier if you explain in pseudocode. Thanks.
>>
>> I trying:) I writing my library for that, but I have not enough time. And
>> work going slow.
>>
>>
>> Sorry about that!
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Imagine you works only with a mutable collections in high concurrent
>> environment and without any locks. Yes, all methods "as in standard
>> library", but what results you achieved?
>> The same thing with infinite streams without consistency control..
>>
>>
>> The causal relationship between your 2 statements is unclear or omitted.
>>
>> My point is: If you as a developer opt into non-determinism, that's up to
>> you. There's nothing preventing you from calling System.exit(1) if you so
>> desire to do so, who am I to say you can't?
>> I hope I have proven that you can choose to have fully deterministic (I
>> believe in your lingo: consistent) streams if you want to.
>>
>>
>> You also omitted replying here, so I have to assume you agree.
>>
>>
>> One stream: S1 = 1, 2, 3
>> S2 = S1 _*2 == 0 // 2, 4, 6
>>
>> S1 zip S2
>> deterministic and consistent result: (1, 2), (2, 4), (3, 6)
>> deterministic and non-consistent (suppose we are know that S1 emit (and
>> drop from buffer) first element before S2 started): (1, 4), (2, 6)
>>
>>
>> So it consistently drops the first element? Sounds quite consistent to me.
>>
>> It drops first element implicitly for user. With "we are knows" I try
>> describe what really doing and why such result.
>>
>>
>> But if it does so consistently, then the user always gets the same
>> results, and as such, it is deterministic.
>>
>> Ahh, sorry I was wrong.
>>  deterministic and non-consistent:
>> S2 = S1 _%2 != 0 // 1, 3
>> S1 zip S2 // (1, 1), (2, 3)
>>
>>
>> So what you're saying is that dropping elements is non-consistent? so
>> "filter" is a potentially non-consistent operation?
>>
> No. Not `filter`  `zip`. Any join operation by default is non-consitent.
> Exclude concat may be.
>

No, round-robin is consistent, concat is consistent. In fact, right now
only merge is non-consistent (nondeterministic).


>
>
>>
>>
>>
>>
>>
>>
>>
>> non-deterministic and consistent: (2, 4), (3, 6), (1, 2)
>>
>>
>> This is not permissible by the spec (implicit reordering that is).
>>
>> Yes. But it can be explicit also. And peoples need to this feature:) (in
>> topic before)
>>
>>
>> The only means of doing so would be to have a sliding window of buffering
>> and then random-outputting according to demand. The only way you'd get that
>> is by actively opting into it.
>>
>> It simple like a `mapFuture`, but without preserve ordering.
>>
>>
>> Should be straight-forward to implement.
>>
> Of course. It easy.
>
>
>>
>>
>> And merge operations breaks specs.
>>
>>
>> I think we've already settled this one.
>>
> Closed:)
>
>>
>>
>>  Moreover it can duplicate elments: S1 = 1, 2, 3; S2 = S1 map identity;
>> S1 merge S2 = 1, 2, 1 , 3, 2, 3
>>
>>
>> Of course, the only way of guaranteeing non-duplicates is to remember the
>> entire stream and deduplicate against the set of all previous values. I
>> don't understand how this is a problem, you actively chose to do what you
>> did?
>>
> Yes. Sorry. Duplicating is not problem. Such use case seems slightly
> unbelievable.
> I think it closed.
>
>
>>
>>
>>
>>
>>
>>
>>
>>
>> non-deterministic and non-consistent: (1, 6), (3, 4), (2, 2)
>>
>>
>> How's reordering non-determinism at one end and non-consistent on the
>> other end?
>>
>> zip after merge for example.
>>
>>
>> But that is also something
>>
>> ...
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to