Re: Sorting bounded data stream

2020-01-30 Thread Jingsong Li
Hi,

If you are using checkpoints, I think a simple way is using a ListState to
store all coming records.
And in endInput(), drain all records from ListState to a sorter to sort all
records.

Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 3:10 PM Łukasz Jędrzejewski  wrote:

> Hi,
>
> Thank you very much for suggestions. I will check out the
> UnilateralSortMerge. However in our case we are using checkpoints.
>
> Kind regards,
> Łukasz
> W dniu 31.01.2020 o 07:54, Jingsong Li pisze:
>
> Hi Łukasz,
>
> First, we are planning to design and implement the BoundedStream story,
> which will be discussed further in 1.11 or 1.12.
>
> SortedMapState was discussed in FLINK-6219 [1], But there are some
> problems that can not be solved well, so they have not been introduced.
>
> If it is a pure BoundedStream without checkpoints, it is not recommended
> to use state, because state is usually used for checkpoints, which will
> cause more overhead.
>
> SortOperator is introduced for table BaseRow, I recommend that you use the
> UnilateralSortMerger to construct your own SortOperator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-6219
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski  wrote:
>
>> Hi all,
>>
>> In Flink 1.9 couple of changes was introduced to deal with bounded
>> streams e.g.  BoundedOneInput interface. I'm wondering would it be
>> doable to do some kind of global sort after receiving end input event on
>> finished data stream source, using only DataStream API?
>>
>> We have made some experiments with BoundedOneInput - buffering elements
>> and then sorting them after receiving the end input event and finally
>> emitting sorted elements. it is seems to be working as excepted though
>> we are having troubles to sort a big stream efficiently. One problem is
>> missing appropriate state type something like SortedMapState. While
>> using MapState the elements are inserted into a kind of byte order. I
>> think it could be possible to do some key modification to achieve
>> correct bytes order but it's not trivial for every type (string, int,
>> tuples, and so on). Do you plan adding such kind of sorted state?
>>
>> In Flink Table API there is SortOperator but it is restricted to
>> BinaryRow. Would it be possible to adapt this functionality in streaming
>> API for arbitrary types? What do you think?
>>
>> Thanks,
>> Łukasz
>>
>>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


Re: Sorting bounded data stream

2020-01-30 Thread Łukasz Jędrzejewski

Hi,

Thank you very much for suggestions. I will check out the 
UnilateralSortMerge. However in our case we are using checkpoints.


Kind regards,
Łukasz

W dniu 31.01.2020 o 07:54, Jingsong Li pisze:

Hi Łukasz,

First, we are planning to design and implement the BoundedStream 
story, which will be discussed further in 1.11 or 1.12.


SortedMapState was discussed in FLINK-6219 [1], But there are some 
problems that can not be solved well, so they have not been introduced.
If it is a pure BoundedStream without checkpoints, it is not 
recommended to use state, because state is usually used for 
checkpoints, which will cause more overhead.


SortOperator is introduced for table BaseRow, I recommend that you use 
the UnilateralSortMerger to construct your own SortOperator.


[1] https://issues.apache.org/jira/browse/FLINK-6219

Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski > wrote:


Hi all,

In Flink 1.9 couple of changes was introduced to deal with bounded
streams e.g.  BoundedOneInput interface. I'm wondering would it be
doable to do some kind of global sort after receiving end input
event on
finished data stream source, using only DataStream API?

We have made some experiments with BoundedOneInput - buffering
elements
and then sorting them after receiving the end input event and finally
emitting sorted elements. it is seems to be working as excepted
though
we are having troubles to sort a big stream efficiently. One
problem is
missing appropriate state type something like SortedMapState. While
using MapState the elements are inserted into a kind of byte order. I
think it could be possible to do some key modification to achieve
correct bytes order but it's not trivial for every type (string, int,
tuples, and so on). Do you plan adding such kind of sorted state?

In Flink Table API there is SortOperator but it is restricted to
BinaryRow. Would it be possible to adapt this functionality in
streaming
API for arbitrary types? What do you think?

Thanks,
Łukasz



--
Best, Jingsong Lee


Re: Sorting bounded data stream

2020-01-30 Thread Jingsong Li
Hi Łukasz,

First, we are planning to design and implement the BoundedStream story,
which will be discussed further in 1.11 or 1.12.

SortedMapState was discussed in FLINK-6219 [1], But there are some problems
that can not be solved well, so they have not been introduced.

If it is a pure BoundedStream without checkpoints, it is not recommended to
use state, because state is usually used for checkpoints, which will cause
more overhead.

SortOperator is introduced for table BaseRow, I recommend that you use the
UnilateralSortMerger to construct your own SortOperator.

[1] https://issues.apache.org/jira/browse/FLINK-6219

Best,
Jingsong Lee

On Fri, Jan 31, 2020 at 2:08 AM Łukasz Jędrzejewski  wrote:

> Hi all,
>
> In Flink 1.9 couple of changes was introduced to deal with bounded
> streams e.g.  BoundedOneInput interface. I'm wondering would it be
> doable to do some kind of global sort after receiving end input event on
> finished data stream source, using only DataStream API?
>
> We have made some experiments with BoundedOneInput - buffering elements
> and then sorting them after receiving the end input event and finally
> emitting sorted elements. it is seems to be working as excepted though
> we are having troubles to sort a big stream efficiently. One problem is
> missing appropriate state type something like SortedMapState. While
> using MapState the elements are inserted into a kind of byte order. I
> think it could be possible to do some key modification to achieve
> correct bytes order but it's not trivial for every type (string, int,
> tuples, and so on). Do you plan adding such kind of sorted state?
>
> In Flink Table API there is SortOperator but it is restricted to
> BinaryRow. Would it be possible to adapt this functionality in streaming
> API for arbitrary types? What do you think?
>
> Thanks,
> Łukasz
>
>

-- 
Best, Jingsong Lee