Re: Sorting in runners

2018-08-07 Thread Rui Wang
Thanks!

I will try to figure why GSC is disallowed and share it if I find anything.


-Rui

On Tue, Aug 7, 2018 at 12:00 PM Lukasz Cwik  wrote:

> IMO, going with SortValues is the right way to go. The idea is that
> runners can always replace the SortValues PTransform with their own
> optimized variant. As you have already pointed out, the default inmemory
> implementation has strict limitations.
>
> I would suggest going with the inmemory version to unblock yourself and
> doing some more digging to figure out why GCS was disallowed explicitly,
> and possibly answers to your disk provisioning questions.
>
> On Tue, Aug 7, 2018 at 11:08 AM Rui Wang  wrote:
>
>> Hi Community,
>>
>> I am trying to support ORDER BY in BeamSQL (currently in global window
>> only, see BEAM-5064). In order to do so, I need to sort PCollection.
>> The scale of dataset that ORDER BY works on is unknown. It might be up to
>> TB sized dataset if BeamSQL runs on some benchmarks. But in the most cases,
>> the sorting shouldn't work on too large dataset.
>>
>> The safe approach is to sort PCollection in memory because memory
>> access should be guaranteed in runners. One possible way is:
>> Combine.globally(new sortCombineFn()), where sortCombineFn can does Merge
>> Sort. This approach is bounded by size of memory on a single machine.
>>
>> External sorting could be more scalable by using storage (e.g. disk).
>> There are some code in beam/sdks/java/extensions/sorter that is doing it.
>> However, seems like GCS is not allowed in ExternalSorter in that sorter
>> module. Assuming ExternalSorter by default uses disk, it is unclear if
>> runners can access disk and how disk space are provisioned. Another
>> observation is ExternalSorter does not clean up generated files during
>> sorting.
>>
>>
>> My question is, in major runners (direct, dataflow, spark, flink, e,g,),
>> if disk is accessible so it is safe to go with external sorting approach
>> regardless of disk space? Also, is there better practice to sort in Beam?
>>
>>
>> Thanks,
>> Rui
>>
>>
>>
>>
>


Re: Sorting in runners

2018-08-07 Thread Lukasz Cwik
IMO, going with SortValues is the right way to go. The idea is that runners
can always replace the SortValues PTransform with their own optimized
variant. As you have already pointed out, the default inmemory
implementation has strict limitations.

I would suggest going with the inmemory version to unblock yourself and
doing some more digging to figure out why GCS was disallowed explicitly,
and possibly answers to your disk provisioning questions.

On Tue, Aug 7, 2018 at 11:08 AM Rui Wang  wrote:

> Hi Community,
>
> I am trying to support ORDER BY in BeamSQL (currently in global window
> only, see BEAM-5064). In order to do so, I need to sort PCollection.
> The scale of dataset that ORDER BY works on is unknown. It might be up to
> TB sized dataset if BeamSQL runs on some benchmarks. But in the most cases,
> the sorting shouldn't work on too large dataset.
>
> The safe approach is to sort PCollection in memory because memory
> access should be guaranteed in runners. One possible way is:
> Combine.globally(new sortCombineFn()), where sortCombineFn can does Merge
> Sort. This approach is bounded by size of memory on a single machine.
>
> External sorting could be more scalable by using storage (e.g. disk).
> There are some code in beam/sdks/java/extensions/sorter that is doing it.
> However, seems like GCS is not allowed in ExternalSorter in that sorter
> module. Assuming ExternalSorter by default uses disk, it is unclear if
> runners can access disk and how disk space are provisioned. Another
> observation is ExternalSorter does not clean up generated files during
> sorting.
>
>
> My question is, in major runners (direct, dataflow, spark, flink, e,g,),
> if disk is accessible so it is safe to go with external sorting approach
> regardless of disk space? Also, is there better practice to sort in Beam?
>
>
> Thanks,
> Rui
>
>
>
>


Sorting in runners

2018-08-07 Thread Rui Wang
Hi Community,

I am trying to support ORDER BY in BeamSQL (currently in global window
only, see BEAM-5064). In order to do so, I need to sort PCollection.
The scale of dataset that ORDER BY works on is unknown. It might be up to
TB sized dataset if BeamSQL runs on some benchmarks. But in the most cases,
the sorting shouldn't work on too large dataset.

The safe approach is to sort PCollection in memory because memory
access should be guaranteed in runners. One possible way is:
Combine.globally(new sortCombineFn()), where sortCombineFn can does Merge
Sort. This approach is bounded by size of memory on a single machine.

External sorting could be more scalable by using storage (e.g. disk). There
are some code in beam/sdks/java/extensions/sorter that is doing it.
However, seems like GCS is not allowed in ExternalSorter in that sorter
module. Assuming ExternalSorter by default uses disk, it is unclear if
runners can access disk and how disk space are provisioned. Another
observation is ExternalSorter does not clean up generated files during
sorting.


My question is, in major runners (direct, dataflow, spark, flink, e,g,), if
disk is accessible so it is safe to go with external sorting approach
regardless of disk space? Also, is there better practice to sort in Beam?


Thanks,
Rui