It would also be helpful to understand what your overall objective is with this output. Is there a reason you need it sorted/partitioned in a certain way?
On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette <bhule...@google.com> wrote: > Hi Wenbing, > Sorry for taking so long to get back to you on this. > I discussed this with Robert offline and we came up with a potential > workaround - you could try writing out the Parquet file from within the > groupby.apply method. You can use beam's FileSystems abstraction to open a > Python file object referencing a cloud storage file, and pass that file > object directly to the pandas to_parquet. It would look something like this: > > df.groupby('key1').apply(lambda df: > df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq")) > > If writing out sorted, partitioned parquet files is a common use-case we > should think about making this easier though. At the very least > partition_cols should work, I filed BEAM-12201 [1] for this. That alone > won't be enough as our implementation will likely reshuffle the dataset to > enforce the partitioning, removing any sorting that you've applied, so we'd > also need to think about how to optimize the pipeline to avoid that shuffle. > > Brian > > [1] https://issues.apache.org/jira/browse/BEAM-12201 > > On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai <wenbing....@getcruise.com> > wrote: > >> Thank you, Brian. I tried `partition_cols`, but it is not working. I >> tried pure pandas, it does work, so I am not sure if anything wrong with >> Beam. >> >> Wenbing >> >> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette <bhule...@google.com> wrote: >> >>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass >>> through [2]. It would be interesting to see what `partition_cols='key1'` >>> does - I suspect it won't work perfectly though. >>> >>> Do you have any thoughts here Robert? >>> >>> [1] >>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html >>> [2] >>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525 >>> >>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <wenbing....@getcruise.com> >>> wrote: >>> >>>> Hi Robert and Brian, >>>> >>>> I tried groupby in my case. Here is my pipeline code. I do see all the >>>> data in the final parquet file are sorted in each group. However, I'd like >>>> to write each partition (group) to an individual file, how can I achieve >>>> it? In addition, I am using the master of Apache Beam SDK, how can I test >>>> the pipeline with DataflowRunner considering there is no dataflow worker >>>> image available? >>>> >>>> data = [ >>>> { >>>> "key1": 1000 + i % 10, >>>> "key2": randrange(10000), >>>> "feature_1": "somestring{}".format(i) >>>> } for i in range(10000) >>>> ] >>>> >>>> class TestRow(typing.NamedTuple): >>>> key1: int >>>> key2: int >>>> feature_1: str >>>> >>>> with beam.Pipeline() as p: >>>> pcoll = ( >>>> p >>>> | beam.Create(data) >>>> | beam.Map(lambda x:x).with_output_types(TestRow) >>>> ) >>>> >>>> df = to_dataframe(pcoll) >>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by= >>>> 'key2') >>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str >>>> (uuid.uuid4())[:8]), engine='pyarrow', index=False) >>>> >>>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <wenbing....@getcruise.com> >>>> wrote: >>>> >>>>> Thank you, Robert and Brian. >>>>> >>>>> I'd like to try this out. I am trying to distribute my dataset to >>>>> nodes, sort each partition by some key and then store each partition to >>>>> its >>>>> own file. >>>>> >>>>> Wenbing >>>>> >>>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <bhule...@google.com> >>>>> wrote: >>>>> >>>>>> Note groupby.apply [1] in particular should be able to do what you >>>>>> want, something like: >>>>>> >>>>>> df.groupby('key1').apply(lambda df: df.sort_values('key2')) >>>>>> >>>>>> But as Robert noted we don't make any guarantees about preserving >>>>>> this ordering later in the pipeline. For this reason I actually just >>>>>> sent a >>>>>> PR to disallow sort_values on the entire dataset [2]. >>>>>> >>>>>> Brian >>>>>> >>>>>> [1] https://github.com/apache/beam/pull/13843 >>>>>> [2] https://github.com/apache/beam/pull/14324 >>>>>> >>>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for trying this out. >>>>>>> >>>>>>> Better support for groupby (e.g. >>>>>>> https://github.com/apache/beam/pull/13843 , >>>>>>> https://github.com/apache/beam/pull/13637) will be available in the >>>>>>> next Beam release (2.29, in progress, but you could try out head if you >>>>>>> want). Note, however, that Beam PCollections are by definition >>>>>>> unordered, >>>>>>> so unless you sort a partition and immediately do something with it that >>>>>>> ordering may not be preserved. If you could let us know what you're >>>>>>> trying >>>>>>> to do with this ordering that would be helpful. >>>>>>> >>>>>>> - Robert >>>>>>> >>>>>>> >>>>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai < >>>>>>> wenbing....@getcruise.com> wrote: >>>>>>> >>>>>>>> Hi Beam users, >>>>>>>> >>>>>>>> I have a user case to partition my PCollection by some key, and >>>>>>>> then sort my rows within the same partition by some other key. >>>>>>>> >>>>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot >>>>>>>> figure out how to make it work. Specifically, I tried df.groupby where >>>>>>>> I >>>>>>>> expect my data will be distributed to different nodes. I also tried >>>>>>>> df.sort_values, but it will sort my whole dataset, which is not what I >>>>>>>> need. >>>>>>>> >>>>>>>> Can someone shed some light on this? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Wenbing Bai >>>>>>>> >>>>>>>> Senior Software Engineer >>>>>>>> >>>>>>>> Data Infrastructure, Cruise >>>>>>>> >>>>>>>> Pronouns: She/Her >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *Confidentiality Note:* We care about protecting our proprietary >>>>>>>> information, confidential material, and trade secrets. This >>>>>>>> message may contain some or all of those things. Cruise will suffer >>>>>>>> material harm if anyone other than the intended recipient disseminates >>>>>>>> or >>>>>>>> takes any action based on this message. If you have received this >>>>>>>> message >>>>>>>> (including any attachments) in error, please delete it immediately and >>>>>>>> notify the sender promptly. >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Wenbing Bai >>>>> >>>>> Senior Software Engineer >>>>> >>>>> Data Infrastructure, Cruise >>>>> >>>>> Pronouns: She/Her >>>>> >>>>> >>>> >>>> -- >>>> >>>> >>>> >>>> >>>> >>>> Wenbing Bai >>>> >>>> Senior Software Engineer >>>> >>>> Data Infrastructure, Cruise >>>> >>>> Pronouns: She/Her >>>> >>>> >>>> >>>> *Confidentiality Note:* We care about protecting our proprietary >>>> information, confidential material, and trade secrets. This message >>>> may contain some or all of those things. Cruise will suffer material harm >>>> if anyone other than the intended recipient disseminates or takes any >>>> action based on this message. If you have received this message (including >>>> any attachments) in error, please delete it immediately and notify the >>>> sender promptly. >>> >>> >> >> -- >> >> >> >> >> >> Wenbing Bai >> >> Senior Software Engineer >> >> Data Infrastructure, Cruise >> >> Pronouns: She/Her >> >> >> >> *Confidentiality Note:* We care about protecting our proprietary >> information, confidential material, and trade secrets. This message may >> contain some or all of those things. Cruise will suffer material harm if >> anyone other than the intended recipient disseminates or takes any action >> based on this message. If you have received this message (including any >> attachments) in error, please delete it immediately and notify the sender >> promptly. > >