It would also be helpful to understand what your overall objective is with
this output. Is there a reason you need it sorted/partitioned in a
certain way?

On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette <bhule...@google.com> wrote:

> Hi Wenbing,
> Sorry for taking so long to get back to you on this.
> I discussed this with Robert offline and we came up with a potential
> workaround - you could try writing out the Parquet file from within the
> groupby.apply method. You can use beam's FileSystems abstraction to open a
> Python file object referencing a cloud storage file, and pass that file
> object directly to the pandas to_parquet. It would look something like this:
>
>   df.groupby('key1').apply(lambda df:
> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>
> If writing out sorted, partitioned parquet files is a common use-case we
> should think about making this easier though. At the very least
> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
> won't be enough as our implementation will likely reshuffle the dataset to
> enforce the partitioning, removing any sorting that you've applied, so we'd
> also need to think about how to optimize the pipeline to avoid that shuffle.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-12201
>
> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai <wenbing....@getcruise.com>
> wrote:
>
>> Thank you, Brian. I tried `partition_cols`, but it is not working. I
>> tried pure pandas, it does work, so I am not sure if anything wrong with
>> Beam.
>>
>> Wenbing
>>
>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette <bhule...@google.com> wrote:
>>
>>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
>>> through [2]. It would be interesting to see what  `partition_cols='key1'`
>>> does - I suspect it won't work perfectly though.
>>>
>>> Do you have any thoughts here Robert?
>>>
>>> [1]
>>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
>>> [2]
>>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>>>
>>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <wenbing....@getcruise.com>
>>> wrote:
>>>
>>>> Hi Robert and Brian,
>>>>
>>>> I tried groupby in my case. Here is my pipeline code. I do see all the
>>>> data in the final parquet file are sorted in each group. However, I'd like
>>>> to write each partition (group) to an individual file, how can I achieve
>>>> it? In addition, I am using the master of Apache Beam SDK, how can I test
>>>> the pipeline with DataflowRunner considering there is no dataflow worker
>>>> image available?
>>>>
>>>> data = [
>>>> {
>>>> "key1": 1000 + i % 10,
>>>> "key2": randrange(10000),
>>>> "feature_1": "somestring{}".format(i)
>>>> } for i in range(10000)
>>>> ]
>>>>
>>>> class TestRow(typing.NamedTuple):
>>>> key1: int
>>>> key2: int
>>>> feature_1: str
>>>>
>>>> with beam.Pipeline() as p:
>>>> pcoll = (
>>>> p
>>>> | beam.Create(data)
>>>> | beam.Map(lambda x:x).with_output_types(TestRow)
>>>> )
>>>>
>>>> df = to_dataframe(pcoll)
>>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by=
>>>> 'key2')
>>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>>>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>>>
>>>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <wenbing....@getcruise.com>
>>>> wrote:
>>>>
>>>>> Thank you, Robert and Brian.
>>>>>
>>>>> I'd like to try this out. I am trying to distribute my dataset to
>>>>> nodes, sort each partition by some key and then store each partition to 
>>>>> its
>>>>> own file.
>>>>>
>>>>> Wenbing
>>>>>
>>>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Note groupby.apply [1] in particular should be able to do what you
>>>>>> want, something like:
>>>>>>
>>>>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>>>>
>>>>>> But as Robert noted we don't make any guarantees about preserving
>>>>>> this ordering later in the pipeline. For this reason I actually just 
>>>>>> sent a
>>>>>> PR to disallow sort_values on the entire dataset [2].
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/13843
>>>>>> [2] https://github.com/apache/beam/pull/14324
>>>>>>
>>>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for trying this out.
>>>>>>>
>>>>>>> Better support for groupby (e.g.
>>>>>>> https://github.com/apache/beam/pull/13843 ,
>>>>>>> https://github.com/apache/beam/pull/13637) will be available in the
>>>>>>> next Beam release (2.29, in progress, but you could try out head if you
>>>>>>> want). Note, however, that Beam PCollections are by definition 
>>>>>>> unordered,
>>>>>>> so unless you sort a partition and immediately do something with it that
>>>>>>> ordering may not be preserved. If you could let us know what you're 
>>>>>>> trying
>>>>>>> to do with this ordering that would be helpful.
>>>>>>>
>>>>>>> - Robert
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai <
>>>>>>> wenbing....@getcruise.com> wrote:
>>>>>>>
>>>>>>>> Hi Beam users,
>>>>>>>>
>>>>>>>> I have a user case to partition my PCollection by some key, and
>>>>>>>> then sort my rows within the same partition by some other key.
>>>>>>>>
>>>>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>>>>>> figure out how to make it work. Specifically, I tried df.groupby where 
>>>>>>>> I
>>>>>>>> expect my data will be distributed to different nodes. I also tried
>>>>>>>> df.sort_values, but it will sort my whole dataset, which is not what I 
>>>>>>>> need.
>>>>>>>>
>>>>>>>> Can someone shed some light on this?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Wenbing Bai
>>>>>>>>
>>>>>>>> Senior Software Engineer
>>>>>>>>
>>>>>>>> Data Infrastructure, Cruise
>>>>>>>>
>>>>>>>> Pronouns: She/Her
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>>>>>> information, confidential material, and trade secrets. This
>>>>>>>> message may contain some or all of those things. Cruise will suffer
>>>>>>>> material harm if anyone other than the intended recipient disseminates 
>>>>>>>> or
>>>>>>>> takes any action based on this message. If you have received this 
>>>>>>>> message
>>>>>>>> (including any attachments) in error, please delete it immediately and
>>>>>>>> notify the sender promptly.
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Wenbing Bai
>>>>>
>>>>> Senior Software Engineer
>>>>>
>>>>> Data Infrastructure, Cruise
>>>>>
>>>>> Pronouns: She/Her
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Wenbing Bai
>>>>
>>>> Senior Software Engineer
>>>>
>>>> Data Infrastructure, Cruise
>>>>
>>>> Pronouns: She/Her
>>>>
>>>>
>>>>
>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>> information, confidential material, and trade secrets. This message
>>>> may contain some or all of those things. Cruise will suffer material harm
>>>> if anyone other than the intended recipient disseminates or takes any
>>>> action based on this message. If you have received this message (including
>>>> any attachments) in error, please delete it immediately and notify the
>>>> sender promptly.
>>>
>>>
>>
>> --
>>
>>
>>
>>
>>
>> Wenbing Bai
>>
>> Senior Software Engineer
>>
>> Data Infrastructure, Cruise
>>
>> Pronouns: She/Her
>>
>>
>>
>> *Confidentiality Note:* We care about protecting our proprietary
>> information, confidential material, and trade secrets. This message may
>> contain some or all of those things. Cruise will suffer material harm if
>> anyone other than the intended recipient disseminates or takes any action
>> based on this message. If you have received this message (including any
>> attachments) in error, please delete it immediately and notify the sender
>> promptly.
>
>

Reply via email to