+dev <[email protected]> In the Beam Java ecosystem, this functionality is provided by the Sorter library (https://beam.apache.org/documentation/sdks/java-extensions/#sorter). I'm curious what people think about various options:
- Python version of the transform(s) - Expose sorter as xlang transform(s) - Convenience transforms (that use pandas in DoFns?) to just do it for small data per key to achieve compatibility - Beam model extension so that runners can do it as part of GBK Kenn On Mon, May 10, 2021 at 5:26 PM Wenbing Bai <[email protected]> wrote: > Hi Robert and Brian, > > I don't know why I didn't catch your replies. But thank you so much for > looking at this. > > My parquet files will be consumed by downstreaming processes which require > data points with the same "key1" that are sorted by "key2". The > downstreaming process, for example, will make a rolling window with size N > that reads N records together at one time. But note, the rolling window > will not cross different "key1". > > So that is saying, 1) I don't need to sort the whole dataset. 2) all data > with the same "key1" should be located together. > > I am not sure if I explain the use case clearly. Let me know what you > think. > > Wenbing > > > On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw <[email protected]> > wrote: > >> It would also be helpful to understand what your overall objective is >> with this output. Is there a reason you need it sorted/partitioned in a >> certain way? >> >> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette <[email protected]> >> wrote: >> >>> Hi Wenbing, >>> Sorry for taking so long to get back to you on this. >>> I discussed this with Robert offline and we came up with a potential >>> workaround - you could try writing out the Parquet file from within the >>> groupby.apply method. You can use beam's FileSystems abstraction to open a >>> Python file object referencing a cloud storage file, and pass that file >>> object directly to the pandas to_parquet. It would look something like this: >>> >>> df.groupby('key1').apply(lambda df: >>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq")) >>> >>> If writing out sorted, partitioned parquet files is a common use-case we >>> should think about making this easier though. At the very least >>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone >>> won't be enough as our implementation will likely reshuffle the dataset to >>> enforce the partitioning, removing any sorting that you've applied, so we'd >>> also need to think about how to optimize the pipeline to avoid that shuffle. >>> >>> Brian >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-12201 >>> >>> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai <[email protected]> >>> wrote: >>> >>>> Thank you, Brian. I tried `partition_cols`, but it is not working. I >>>> tried pure pandas, it does work, so I am not sure if anything wrong with >>>> Beam. >>>> >>>> Wenbing >>>> >>>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette <[email protected]> >>>> wrote: >>>> >>>>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass >>>>> through [2]. It would be interesting to see what `partition_cols='key1'` >>>>> does - I suspect it won't work perfectly though. >>>>> >>>>> Do you have any thoughts here Robert? >>>>> >>>>> [1] >>>>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html >>>>> [2] >>>>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525 >>>>> >>>>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Robert and Brian, >>>>>> >>>>>> I tried groupby in my case. Here is my pipeline code. I do see all >>>>>> the data in the final parquet file are sorted in each group. However, I'd >>>>>> like to write each partition (group) to an individual file, how can I >>>>>> achieve it? In addition, I am using the master of Apache Beam SDK, how >>>>>> can >>>>>> I test the pipeline with DataflowRunner considering there is no dataflow >>>>>> worker image available? >>>>>> >>>>>> data = [ >>>>>> { >>>>>> "key1": 1000 + i % 10, >>>>>> "key2": randrange(10000), >>>>>> "feature_1": "somestring{}".format(i) >>>>>> } for i in range(10000) >>>>>> ] >>>>>> >>>>>> class TestRow(typing.NamedTuple): >>>>>> key1: int >>>>>> key2: int >>>>>> feature_1: str >>>>>> >>>>>> with beam.Pipeline() as p: >>>>>> pcoll = ( >>>>>> p >>>>>> | beam.Create(data) >>>>>> | beam.Map(lambda x:x).with_output_types(TestRow) >>>>>> ) >>>>>> >>>>>> df = to_dataframe(pcoll) >>>>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by= >>>>>> 'key2') >>>>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str >>>>>> (uuid.uuid4())[:8]), engine='pyarrow', index=False) >>>>>> >>>>>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thank you, Robert and Brian. >>>>>>> >>>>>>> I'd like to try this out. I am trying to distribute my dataset to >>>>>>> nodes, sort each partition by some key and then store each partition to >>>>>>> its >>>>>>> own file. >>>>>>> >>>>>>> Wenbing >>>>>>> >>>>>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Note groupby.apply [1] in particular should be able to do what you >>>>>>>> want, something like: >>>>>>>> >>>>>>>> df.groupby('key1').apply(lambda df: df.sort_values('key2')) >>>>>>>> >>>>>>>> But as Robert noted we don't make any guarantees about preserving >>>>>>>> this ordering later in the pipeline. For this reason I actually just >>>>>>>> sent a >>>>>>>> PR to disallow sort_values on the entire dataset [2]. >>>>>>>> >>>>>>>> Brian >>>>>>>> >>>>>>>> [1] https://github.com/apache/beam/pull/13843 >>>>>>>> [2] https://github.com/apache/beam/pull/14324 >>>>>>>> >>>>>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for trying this out. >>>>>>>>> >>>>>>>>> Better support for groupby (e.g. >>>>>>>>> https://github.com/apache/beam/pull/13843 , >>>>>>>>> https://github.com/apache/beam/pull/13637) will be available in >>>>>>>>> the next Beam release (2.29, in progress, but you could try out head >>>>>>>>> if you >>>>>>>>> want). Note, however, that Beam PCollections are by definition >>>>>>>>> unordered, >>>>>>>>> so unless you sort a partition and immediately do something with it >>>>>>>>> that >>>>>>>>> ordering may not be preserved. If you could let us know what you're >>>>>>>>> trying >>>>>>>>> to do with this ordering that would be helpful. >>>>>>>>> >>>>>>>>> - Robert >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Beam users, >>>>>>>>>> >>>>>>>>>> I have a user case to partition my PCollection by some key, and >>>>>>>>>> then sort my rows within the same partition by some other key. >>>>>>>>>> >>>>>>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot >>>>>>>>>> figure out how to make it work. Specifically, I tried df.groupby >>>>>>>>>> where I >>>>>>>>>> expect my data will be distributed to different nodes. I also tried >>>>>>>>>> df.sort_values, but it will sort my whole dataset, which is not what >>>>>>>>>> I need. >>>>>>>>>> >>>>>>>>>> Can someone shed some light on this? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Wenbing Bai >>>>>>>>>> >>>>>>>>>> Senior Software Engineer >>>>>>>>>> >>>>>>>>>> Data Infrastructure, Cruise >>>>>>>>>> >>>>>>>>>> Pronouns: She/Her >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *Confidentiality Note:* We care about protecting our proprietary >>>>>>>>>> information, confidential material, and trade secrets. This >>>>>>>>>> message may contain some or all of those things. Cruise will suffer >>>>>>>>>> material harm if anyone other than the intended recipient >>>>>>>>>> disseminates or >>>>>>>>>> takes any action based on this message. If you have received this >>>>>>>>>> message >>>>>>>>>> (including any attachments) in error, please delete it immediately >>>>>>>>>> and >>>>>>>>>> notify the sender promptly. >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Wenbing Bai >>>>>>> >>>>>>> Senior Software Engineer >>>>>>> >>>>>>> Data Infrastructure, Cruise >>>>>>> >>>>>>> Pronouns: She/Her >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Wenbing Bai >>>>>> >>>>>> Senior Software Engineer >>>>>> >>>>>> Data Infrastructure, Cruise >>>>>> >>>>>> Pronouns: She/Her >>>>>> >>>>>> >>>>>> >>>>>> *Confidentiality Note:* We care about protecting our proprietary >>>>>> information, confidential material, and trade secrets. This message >>>>>> may contain some or all of those things. Cruise will suffer material harm >>>>>> if anyone other than the intended recipient disseminates or takes any >>>>>> action based on this message. If you have received this message >>>>>> (including >>>>>> any attachments) in error, please delete it immediately and notify the >>>>>> sender promptly. >>>>> >>>>> >>>> >>>> -- >>>> >>>> >>>> >>>> >>>> >>>> Wenbing Bai >>>> >>>> Senior Software Engineer >>>> >>>> Data Infrastructure, Cruise >>>> >>>> Pronouns: She/Her >>>> >>>> >>>> >>>> *Confidentiality Note:* We care about protecting our proprietary >>>> information, confidential material, and trade secrets. This message >>>> may contain some or all of those things. Cruise will suffer material harm >>>> if anyone other than the intended recipient disseminates or takes any >>>> action based on this message. If you have received this message (including >>>> any attachments) in error, please delete it immediately and notify the >>>> sender promptly. >>> >>> > > -- > > > > > > Wenbing Bai > > Senior Software Engineer > > Data Infrastructure, Cruise > > Pronouns: She/Her > > > > *Confidentiality Note:* We care about protecting our proprietary > information, confidential material, and trade secrets. This message may > contain some or all of those things. Cruise will suffer material harm if > anyone other than the intended recipient disseminates or takes any action > based on this message. If you have received this message (including any > attachments) in error, please delete it immediately and notify the sender > promptly.
