Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-18 Thread Dawid Wysakowicz
> Ok, got your point now. I agree that it makes more sense to > make StateBackend return a contract instead of a particular > implementation. How about we name the new interface as > `CheckpointableKeyedStateBackend`? We could make > `BoundedStreamStateBackend` implement >

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-18 Thread Yu Li
*bq. The problem is that I could not use this "state backend" in a StreamOperator.* Ok, got your point now. I agree that it makes more sense to make StateBackend return a contract instead of a particular implementation. How about we name the new interface as `CheckpointableKeyedStateBackend`? We

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-18 Thread Dawid Wysakowicz
> === > /class BoundedStreamInternalStateBackend implements >         KeyedStateBackend, >         SnapshotStrategy>, >         Closeable, >         CheckpointListener {/ > ===/ > / The problem is that I could

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-18 Thread Yu Li
Thanks for the clarification Dawid. Some of my thoughts: *bq. The results are times for end-to-end execution of a job. Therefore the sorting part is included. The actual target of the replacement is RocksDB, which does the serialization and key bytes comparison as well.* I see. Checking the FLIP

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-17 Thread Dawid Wysakowicz
Thanks for the comments Yu. > First of all, for the performance testing result, I'm wondering whether the > sorting cost is counted in the result for both DataSet and refined > DataStream implementations. I could think of the saving of hash computation > and final iteration to emit the word-count

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-17 Thread Yu Li
Hi all, Sorry for being late to the discussion, but I just noticed there are some state backend related changes proposed in this FLIP, so would like to share my two cents. First of all, for the performance testing result, I'm wondering whether the sorting cost is counted in the result for both

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Kurt Young
Yes, I didn't intend to block this FLIP, and some of the comments are actually implementation details. And all of them are handled internally, not visible to users, thus we can also change or improve them in the future. Best, Kurt On Wed, Sep 9, 2020 at 5:03 PM Aljoscha Krettek wrote: > I

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Aljoscha Krettek
I think Kurts concerns/comments are very valid and we need to implement such things in the future. However, I also think that we need to get started somewhere and I think what's proposed in this FLIP is a good starting point that we can build on. So we should not get paralyzed by thinking too

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Dawid Wysakowicz
That's for sure. I am not claiming against it. What I am saying is that we don't necessarily need a true "sorting" in this particular use case. We only need to cluster records with the same keys together. We don't need the keys to be logically sorted. What I am saying is that for clustering the

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-09 Thread Kurt Young
I doubt that any sorting algorithm would work with only knowing the keys are different but without information of which is greater. Best, Kurt On Tue, Sep 8, 2020 at 10:59 PM Dawid Wysakowicz wrote: > Ad. 1 > > Yes, you are right in principle. > > Let me though clarify my proposal a bit. The

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Dawid Wysakowicz
Ad. 1 Yes, you are right in principle. Let me though clarify my proposal a bit. The proposed sort-style execution aims at a generic KeyedProcessFunction were all the "aggregations" are actually performed in the user code. It tries to improve the performance by actually removing the need to use

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Kurt Young
Regarding #1, yes the state backend is definitely hash-based execution. However there are some differences between batch hash-based execution. The key difference is *random access & read/write mixed workload". For example, by using state backend in streaming execution, one have to mix the read and

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-08 Thread Dawid Wysakowicz
Hey Kurt, Thank you for comments! Ad. 1 I might have missed something here, but as far as I see it is that using the current execution stack with regular state backends (RocksDB in particular if we want to have spilling capabilities) is equivalent to hash-based execution. I can see a different

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-07 Thread Kurt Young
Hi Dawid, thanks for bringing this up, it's really exciting to see that batch execution is introduced in DataStream. From the flip, it seems we are sticking with sort based execution mode (at least for now), which will sort the whole input data before any *keyed* operation is executed. I have two

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-07 Thread Aljoscha Krettek
Yes, I think we can address the problem of indeterminacy in a separate FLIP because we're already in it. Aljoscha On 07.09.20 17:00, Dawid Wysakowicz wrote: @Seth That's a very good point. I agree that RocksDB has the same problem. I think we can use the same approach for the sorted shuffles

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-07 Thread Dawid Wysakowicz
@Seth That's a very good point. I agree that RocksDB has the same problem. I think we can use the same approach for the sorted shuffles then. @Aljoscha I agree we should think about making it more resilient, as I guess users might have problems already if they use keys with non-deterministic

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-04 Thread Seth Wiesman
There is already an implicit assumption the TypeSerializer for keys is stable/deterministic, RocksDB compares keys using their serialized byte strings. I think this is a non-issue (or at least it's not changing the status quo). On Fri, Sep 4, 2020 at 6:39 AM Timo Walther wrote: > +1 for getting

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-04 Thread Timo Walther
+1 for getting rid of the TypeComparator interface and rely on the serialized representation for grouping. Adding a new type to DataStream API is quite difficult at the moment due to too many components that are required: TypeInformation (tries to deal with logical fields for

Re: [DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-04 Thread Aljoscha Krettek
Thanks for publishing the FLIP! On 2020/09/01 06:49:06, Dawid Wysakowicz wrote: > 1. How to sort/group keys? What representation of the key should we > use? Should we sort on the binary form or should we depend on > Comparators being available. Initially, I suggested to Dawid (in

[DISCUSS] FLIP-140: Introduce bounded style execution for keyed streams

2020-09-01 Thread Dawid Wysakowicz
Hi devs, As described in the FLIP-131[1] we intend to deprecate and remove the DataSet API in the future in favour of the DataStream API for both bounded/batch and unbounded/streaming jobs. Ideally, we should be able to stay in the same performance ballpark with bounded DataStream programs as