+dev <[email protected]>

In the Beam Java ecosystem, this functionality is provided by the Sorter
library (https://beam.apache.org/documentation/sdks/java-extensions/#sorter).
I'm curious what people think about various options:

 - Python version of the transform(s)
 - Expose sorter as xlang transform(s)
 - Convenience transforms (that use pandas in DoFns?) to just do it for
small data per key to achieve compatibility
 - Beam model extension so that runners can do it as part of GBK

Kenn

On Mon, May 10, 2021 at 5:26 PM Wenbing Bai <[email protected]>
wrote:

> Hi Robert and Brian,
>
> I don't know why I didn't catch your replies. But thank you so much for
> looking at this.
>
> My parquet files will be consumed by downstreaming processes which require
> data points with the same "key1" that are sorted by "key2". The
> downstreaming process, for example, will make a rolling window with size N
> that reads N records together at one time. But note, the rolling window
> will not cross different "key1".
>
> So that is saying, 1) I don't need to sort the whole dataset. 2) all data
> with the same "key1" should be located together.
>
> I am not sure if I explain the use case clearly. Let me know what you
> think.
>
> Wenbing
>
>
> On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> It would also be helpful to understand what your overall objective is
>> with this output. Is there a reason you need it sorted/partitioned in a
>> certain way?
>>
>> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette <[email protected]>
>> wrote:
>>
>>> Hi Wenbing,
>>> Sorry for taking so long to get back to you on this.
>>> I discussed this with Robert offline and we came up with a potential
>>> workaround - you could try writing out the Parquet file from within the
>>> groupby.apply method. You can use beam's FileSystems abstraction to open a
>>> Python file object referencing a cloud storage file, and pass that file
>>> object directly to the pandas to_parquet. It would look something like this:
>>>
>>>   df.groupby('key1').apply(lambda df:
>>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>>>
>>> If writing out sorted, partitioned parquet files is a common use-case we
>>> should think about making this easier though. At the very least
>>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
>>> won't be enough as our implementation will likely reshuffle the dataset to
>>> enforce the partitioning, removing any sorting that you've applied, so we'd
>>> also need to think about how to optimize the pipeline to avoid that shuffle.
>>>
>>> Brian
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-12201
>>>
>>> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai <[email protected]>
>>> wrote:
>>>
>>>> Thank you, Brian. I tried `partition_cols`, but it is not working. I
>>>> tried pure pandas, it does work, so I am not sure if anything wrong with
>>>> Beam.
>>>>
>>>> Wenbing
>>>>
>>>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette <[email protected]>
>>>> wrote:
>>>>
>>>>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
>>>>> through [2]. It would be interesting to see what  `partition_cols='key1'`
>>>>> does - I suspect it won't work perfectly though.
>>>>>
>>>>> Do you have any thoughts here Robert?
>>>>>
>>>>> [1]
>>>>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>>>>>
>>>>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Robert and Brian,
>>>>>>
>>>>>> I tried groupby in my case. Here is my pipeline code. I do see all
>>>>>> the data in the final parquet file are sorted in each group. However, I'd
>>>>>> like to write each partition (group) to an individual file, how can I
>>>>>> achieve it? In addition, I am using the master of Apache Beam SDK, how 
>>>>>> can
>>>>>> I test the pipeline with DataflowRunner considering there is no dataflow
>>>>>> worker image available?
>>>>>>
>>>>>> data = [
>>>>>> {
>>>>>> "key1": 1000 + i % 10,
>>>>>> "key2": randrange(10000),
>>>>>> "feature_1": "somestring{}".format(i)
>>>>>> } for i in range(10000)
>>>>>> ]
>>>>>>
>>>>>> class TestRow(typing.NamedTuple):
>>>>>> key1: int
>>>>>> key2: int
>>>>>> feature_1: str
>>>>>>
>>>>>> with beam.Pipeline() as p:
>>>>>> pcoll = (
>>>>>> p
>>>>>> | beam.Create(data)
>>>>>> | beam.Map(lambda x:x).with_output_types(TestRow)
>>>>>> )
>>>>>>
>>>>>> df = to_dataframe(pcoll)
>>>>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by=
>>>>>> 'key2')
>>>>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>>>>>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>>>>>
>>>>>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thank you, Robert and Brian.
>>>>>>>
>>>>>>> I'd like to try this out. I am trying to distribute my dataset to
>>>>>>> nodes, sort each partition by some key and then store each partition to 
>>>>>>> its
>>>>>>> own file.
>>>>>>>
>>>>>>> Wenbing
>>>>>>>
>>>>>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Note groupby.apply [1] in particular should be able to do what you
>>>>>>>> want, something like:
>>>>>>>>
>>>>>>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>>>>>>
>>>>>>>> But as Robert noted we don't make any guarantees about preserving
>>>>>>>> this ordering later in the pipeline. For this reason I actually just 
>>>>>>>> sent a
>>>>>>>> PR to disallow sort_values on the entire dataset [2].
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/beam/pull/13843
>>>>>>>> [2] https://github.com/apache/beam/pull/14324
>>>>>>>>
>>>>>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for trying this out.
>>>>>>>>>
>>>>>>>>> Better support for groupby (e.g.
>>>>>>>>> https://github.com/apache/beam/pull/13843 ,
>>>>>>>>> https://github.com/apache/beam/pull/13637) will be available in
>>>>>>>>> the next Beam release (2.29, in progress, but you could try out head 
>>>>>>>>> if you
>>>>>>>>> want). Note, however, that Beam PCollections are by definition 
>>>>>>>>> unordered,
>>>>>>>>> so unless you sort a partition and immediately do something with it 
>>>>>>>>> that
>>>>>>>>> ordering may not be preserved. If you could let us know what you're 
>>>>>>>>> trying
>>>>>>>>> to do with this ordering that would be helpful.
>>>>>>>>>
>>>>>>>>> - Robert
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Beam users,
>>>>>>>>>>
>>>>>>>>>> I have a user case to partition my PCollection by some key, and
>>>>>>>>>> then sort my rows within the same partition by some other key.
>>>>>>>>>>
>>>>>>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>>>>>>>> figure out how to make it work. Specifically, I tried df.groupby 
>>>>>>>>>> where I
>>>>>>>>>> expect my data will be distributed to different nodes. I also tried
>>>>>>>>>> df.sort_values, but it will sort my whole dataset, which is not what 
>>>>>>>>>> I need.
>>>>>>>>>>
>>>>>>>>>> Can someone shed some light on this?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Wenbing Bai
>>>>>>>>>>
>>>>>>>>>> Senior Software Engineer
>>>>>>>>>>
>>>>>>>>>> Data Infrastructure, Cruise
>>>>>>>>>>
>>>>>>>>>> Pronouns: She/Her
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>>>>>>>> information, confidential material, and trade secrets. This
>>>>>>>>>> message may contain some or all of those things. Cruise will suffer
>>>>>>>>>> material harm if anyone other than the intended recipient 
>>>>>>>>>> disseminates or
>>>>>>>>>> takes any action based on this message. If you have received this 
>>>>>>>>>> message
>>>>>>>>>> (including any attachments) in error, please delete it immediately 
>>>>>>>>>> and
>>>>>>>>>> notify the sender promptly.
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Wenbing Bai
>>>>>>>
>>>>>>> Senior Software Engineer
>>>>>>>
>>>>>>> Data Infrastructure, Cruise
>>>>>>>
>>>>>>> Pronouns: She/Her
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Wenbing Bai
>>>>>>
>>>>>> Senior Software Engineer
>>>>>>
>>>>>> Data Infrastructure, Cruise
>>>>>>
>>>>>> Pronouns: She/Her
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>>>> information, confidential material, and trade secrets. This message
>>>>>> may contain some or all of those things. Cruise will suffer material harm
>>>>>> if anyone other than the intended recipient disseminates or takes any
>>>>>> action based on this message. If you have received this message 
>>>>>> (including
>>>>>> any attachments) in error, please delete it immediately and notify the
>>>>>> sender promptly.
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Wenbing Bai
>>>>
>>>> Senior Software Engineer
>>>>
>>>> Data Infrastructure, Cruise
>>>>
>>>> Pronouns: She/Her
>>>>
>>>>
>>>>
>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>> information, confidential material, and trade secrets. This message
>>>> may contain some or all of those things. Cruise will suffer material harm
>>>> if anyone other than the intended recipient disseminates or takes any
>>>> action based on this message. If you have received this message (including
>>>> any attachments) in error, please delete it immediately and notify the
>>>> sender promptly.
>>>
>>>
>
> --
>
>
>
>
>
> Wenbing Bai
>
> Senior Software Engineer
>
> Data Infrastructure, Cruise
>
> Pronouns: She/Her
>
>
>
> *Confidentiality Note:* We care about protecting our proprietary
> information, confidential material, and trade secrets. This message may
> contain some or all of those things. Cruise will suffer material harm if
> anyone other than the intended recipient disseminates or takes any action
> based on this message. If you have received this message (including any
> attachments) in error, please delete it immediately and notify the sender
> promptly.

Reply via email to