Re: [EXT] Re: [EXT] Re: [EXT] Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-05-14 Thread Wenbing Bai
Thank you for the clarification! Is there a way to control the number of
shards, i.e. the bundle? I know in pure Beam IO connectors, we have
num_shards supported, for example, WriteToParquet
<https://github.com/apache/beam/blob/979269e4519bbe46105f68e0b8aa566bf854d95d/sdks/python/apache_beam/io/parquetio.py#L344>
.

Wenbing

On Thu, May 13, 2021 at 4:35 PM Robert Bradshaw  wrote:

> Sharding is determined by the distribution of work. Each worker writes to
> its own shard, and in the case of dynamic partitioning, etc. workers may
> end up processing more than one "bundle" of items and hence produce more
> than one shard. See also
> https://beam.apache.org/documentation/runtime/model/
>
> On Thu, May 13, 2021 at 3:58 PM Wenbing Bai 
> wrote:
>
>> Hi team,
>>
>> I have another question when using Beam Dataframe IO connector. I tried
>> to_parquet, and my data are written to several different files. I am
>> wondering how I can control the number of files (shards) or how the
>> sharding is done for to_parquet and other Beam Dataframe IO APIs?
>>
>> Thank you!
>> Wenbing
>>
>> On Tue, May 11, 2021 at 12:20 PM Kenneth Knowles  wrote:
>>
>>> +dev 
>>>
>>> In the Beam Java ecosystem, this functionality is provided by the Sorter
>>> library (
>>> https://beam.apache.org/documentation/sdks/java-extensions/#sorter).
>>> I'm curious what people think about various options:
>>>
>>>  - Python version of the transform(s)
>>>  - Expose sorter as xlang transform(s)
>>>  - Convenience transforms (that use pandas in DoFns?) to just do it for
>>> small data per key to achieve compatibility
>>>  - Beam model extension so that runners can do it as part of GBK
>>>
>>> Kenn
>>>
>>> On Mon, May 10, 2021 at 5:26 PM Wenbing Bai 
>>> wrote:
>>>
>>>> Hi Robert and Brian,
>>>>
>>>> I don't know why I didn't catch your replies. But thank you so much for
>>>> looking at this.
>>>>
>>>> My parquet files will be consumed by downstreaming processes which
>>>> require data points with the same "key1" that are sorted by "key2". The
>>>> downstreaming process, for example, will make a rolling window with size N
>>>> that reads N records together at one time. But note, the rolling window
>>>> will not cross different "key1".
>>>>
>>>> So that is saying, 1) I don't need to sort the whole dataset. 2) all
>>>> data with the same "key1" should be located together.
>>>>
>>>> I am not sure if I explain the use case clearly. Let me know what you
>>>> think.
>>>>
>>>> Wenbing
>>>>
>>>>
>>>> On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> It would also be helpful to understand what your overall objective is
>>>>> with this output. Is there a reason you need it sorted/partitioned in a
>>>>> certain way?
>>>>>
>>>>> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> Hi Wenbing,
>>>>>> Sorry for taking so long to get back to you on this.
>>>>>> I discussed this with Robert offline and we came up with a potential
>>>>>> workaround - you could try writing out the Parquet file from within the
>>>>>> groupby.apply method. You can use beam's FileSystems abstraction to open 
>>>>>> a
>>>>>> Python file object referencing a cloud storage file, and pass that file
>>>>>> object directly to the pandas to_parquet. It would look something like 
>>>>>> this:
>>>>>>
>>>>>>   df.groupby('key1').apply(lambda df:
>>>>>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>>>>>>
>>>>>> If writing out sorted, partitioned parquet files is a common use-case
>>>>>> we should think about making this easier though. At the very least
>>>>>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
>>>>>> won't be enough as our implementation will likely reshuffle the dataset 
>>>>>> to
>>>>>> enforce the partitioning, removing any sorting that you've applied, so 
>>>>>> we'd
>>>>>> also need to think about how to optimize the pi

Re: [EXT] Re: [EXT] Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-05-13 Thread Wenbing Bai
Hi team,

I have another question when using Beam Dataframe IO connector. I tried
to_parquet, and my data are written to several different files. I am
wondering how I can control the number of files (shards) or how the
sharding is done for to_parquet and other Beam Dataframe IO APIs?

Thank you!
Wenbing

On Tue, May 11, 2021 at 12:20 PM Kenneth Knowles  wrote:

> +dev 
>
> In the Beam Java ecosystem, this functionality is provided by the Sorter
> library (
> https://beam.apache.org/documentation/sdks/java-extensions/#sorter). I'm
> curious what people think about various options:
>
>  - Python version of the transform(s)
>  - Expose sorter as xlang transform(s)
>  - Convenience transforms (that use pandas in DoFns?) to just do it for
> small data per key to achieve compatibility
>  - Beam model extension so that runners can do it as part of GBK
>
> Kenn
>
> On Mon, May 10, 2021 at 5:26 PM Wenbing Bai 
> wrote:
>
>> Hi Robert and Brian,
>>
>> I don't know why I didn't catch your replies. But thank you so much for
>> looking at this.
>>
>> My parquet files will be consumed by downstreaming processes which
>> require data points with the same "key1" that are sorted by "key2". The
>> downstreaming process, for example, will make a rolling window with size N
>> that reads N records together at one time. But note, the rolling window
>> will not cross different "key1".
>>
>> So that is saying, 1) I don't need to sort the whole dataset. 2) all data
>> with the same "key1" should be located together.
>>
>> I am not sure if I explain the use case clearly. Let me know what you
>> think.
>>
>> Wenbing
>>
>>
>> On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw 
>> wrote:
>>
>>> It would also be helpful to understand what your overall objective is
>>> with this output. Is there a reason you need it sorted/partitioned in a
>>> certain way?
>>>
>>> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette 
>>> wrote:
>>>
>>>> Hi Wenbing,
>>>> Sorry for taking so long to get back to you on this.
>>>> I discussed this with Robert offline and we came up with a potential
>>>> workaround - you could try writing out the Parquet file from within the
>>>> groupby.apply method. You can use beam's FileSystems abstraction to open a
>>>> Python file object referencing a cloud storage file, and pass that file
>>>> object directly to the pandas to_parquet. It would look something like 
>>>> this:
>>>>
>>>>   df.groupby('key1').apply(lambda df:
>>>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>>>>
>>>> If writing out sorted, partitioned parquet files is a common use-case
>>>> we should think about making this easier though. At the very least
>>>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
>>>> won't be enough as our implementation will likely reshuffle the dataset to
>>>> enforce the partitioning, removing any sorting that you've applied, so we'd
>>>> also need to think about how to optimize the pipeline to avoid that 
>>>> shuffle.
>>>>
>>>> Brian
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-12201
>>>>
>>>> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai 
>>>> wrote:
>>>>
>>>>> Thank you, Brian. I tried `partition_cols`, but it is not working. I
>>>>> tried pure pandas, it does work, so I am not sure if anything wrong with
>>>>> Beam.
>>>>>
>>>>> Wenbing
>>>>>
>>>>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> Hm, to_parquet does have a `partition_cols` argument [1] which we
>>>>>> pass through [2]. It would be interesting to see what
>>>>>> `partition_cols='key1'` does - I suspect it won't work perfectly though.
>>>>>>
>>>>>> Do you have any thoughts here Robert?
>>>>>>
>>>>>> [1]
>>>>>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>>>>>>
>>>>>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
>>&

Re: [EXT] Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-05-10 Thread Wenbing Bai
Hi Robert and Brian,

I don't know why I didn't catch your replies. But thank you so much for
looking at this.

My parquet files will be consumed by downstreaming processes which require
data points with the same "key1" that are sorted by "key2". The
downstreaming process, for example, will make a rolling window with size N
that reads N records together at one time. But note, the rolling window
will not cross different "key1".

So that is saying, 1) I don't need to sort the whole dataset. 2) all data
with the same "key1" should be located together.

I am not sure if I explain the use case clearly. Let me know what you think.

Wenbing


On Tue, Apr 20, 2021 at 5:01 PM Robert Bradshaw  wrote:

> It would also be helpful to understand what your overall objective is with
> this output. Is there a reason you need it sorted/partitioned in a
> certain way?
>
> On Tue, Apr 20, 2021 at 4:51 PM Brian Hulette  wrote:
>
>> Hi Wenbing,
>> Sorry for taking so long to get back to you on this.
>> I discussed this with Robert offline and we came up with a potential
>> workaround - you could try writing out the Parquet file from within the
>> groupby.apply method. You can use beam's FileSystems abstraction to open a
>> Python file object referencing a cloud storage file, and pass that file
>> object directly to the pandas to_parquet. It would look something like this:
>>
>>   df.groupby('key1').apply(lambda df:
>> df.sort_values(by='key2').to_parquet(FileSystems.open("gs://bucket/file.pq"))
>>
>> If writing out sorted, partitioned parquet files is a common use-case we
>> should think about making this easier though. At the very least
>> partition_cols should work, I filed BEAM-12201 [1] for this. That alone
>> won't be enough as our implementation will likely reshuffle the dataset to
>> enforce the partitioning, removing any sorting that you've applied, so we'd
>> also need to think about how to optimize the pipeline to avoid that shuffle.
>>
>> Brian
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-12201
>>
>> On Wed, Apr 7, 2021 at 9:02 PM Wenbing Bai 
>> wrote:
>>
>>> Thank you, Brian. I tried `partition_cols`, but it is not working. I
>>> tried pure pandas, it does work, so I am not sure if anything wrong with
>>> Beam.
>>>
>>> Wenbing
>>>
>>> On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette 
>>> wrote:
>>>
>>>> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
>>>> through [2]. It would be interesting to see what  `partition_cols='key1'`
>>>> does - I suspect it won't work perfectly though.
>>>>
>>>> Do you have any thoughts here Robert?
>>>>
>>>> [1]
>>>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
>>>> [2]
>>>> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>>>>
>>>> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
>>>> wrote:
>>>>
>>>>> Hi Robert and Brian,
>>>>>
>>>>> I tried groupby in my case. Here is my pipeline code. I do see all the
>>>>> data in the final parquet file are sorted in each group. However, I'd like
>>>>> to write each partition (group) to an individual file, how can I achieve
>>>>> it? In addition, I am using the master of Apache Beam SDK, how can I test
>>>>> the pipeline with DataflowRunner considering there is no dataflow worker
>>>>> image available?
>>>>>
>>>>> data = [
>>>>> {
>>>>> "key1": 1000 + i % 10,
>>>>> "key2": randrange(1),
>>>>> "feature_1": "somestring{}".format(i)
>>>>> } for i in range(1)
>>>>> ]
>>>>>
>>>>> class TestRow(typing.NamedTuple):
>>>>> key1: int
>>>>> key2: int
>>>>> feature_1: str
>>>>>
>>>>> with beam.Pipeline() as p:
>>>>> pcoll = (
>>>>> p
>>>>> | beam.Create(data)
>>>>> | beam.Map(lambda x:x).with_output_types(TestRow)
>>>>> )
>>>>>
>>>>> df = to_dataframe(pcoll)
>>>>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by=
>>>>> 'key2')
>>>>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>>>>

Re: [EXT] Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-04-07 Thread Wenbing Bai
Thank you, Brian. I tried `partition_cols`, but it is not working. I tried
pure pandas, it does work, so I am not sure if anything wrong with Beam.

Wenbing

On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette  wrote:

> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
> through [2]. It would be interesting to see what  `partition_cols='key1'`
> does - I suspect it won't work perfectly though.
>
> Do you have any thoughts here Robert?
>
> [1]
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
> [2]
> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>
> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai 
> wrote:
>
>> Hi Robert and Brian,
>>
>> I tried groupby in my case. Here is my pipeline code. I do see all the
>> data in the final parquet file are sorted in each group. However, I'd like
>> to write each partition (group) to an individual file, how can I achieve
>> it? In addition, I am using the master of Apache Beam SDK, how can I test
>> the pipeline with DataflowRunner considering there is no dataflow worker
>> image available?
>>
>> data = [
>> {
>> "key1": 1000 + i % 10,
>> "key2": randrange(1),
>> "feature_1": "somestring{}".format(i)
>> } for i in range(1)
>> ]
>>
>> class TestRow(typing.NamedTuple):
>> key1: int
>> key2: int
>> feature_1: str
>>
>> with beam.Pipeline() as p:
>> pcoll = (
>> p
>> | beam.Create(data)
>> | beam.Map(lambda x:x).with_output_types(TestRow)
>> )
>>
>> df = to_dataframe(pcoll)
>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2')
>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>
>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai 
>> wrote:
>>
>>> Thank you, Robert and Brian.
>>>
>>> I'd like to try this out. I am trying to distribute my dataset to nodes,
>>> sort each partition by some key and then store each partition to its own
>>> file.
>>>
>>> Wenbing
>>>
>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette 
>>> wrote:
>>>
>>>> Note groupby.apply [1] in particular should be able to do what you
>>>> want, something like:
>>>>
>>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>>
>>>> But as Robert noted we don't make any guarantees about preserving this
>>>> ordering later in the pipeline. For this reason I actually just sent a PR
>>>> to disallow sort_values on the entire dataset [2].
>>>>
>>>> Brian
>>>>
>>>> [1] https://github.com/apache/beam/pull/13843
>>>> [2] https://github.com/apache/beam/pull/14324
>>>>
>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Thanks for trying this out.
>>>>>
>>>>> Better support for groupby (e.g.
>>>>> https://github.com/apache/beam/pull/13843 ,
>>>>> https://github.com/apache/beam/pull/13637) will be available in the
>>>>> next Beam release (2.29, in progress, but you could try out head if you
>>>>> want). Note, however, that Beam PCollections are by definition unordered,
>>>>> so unless you sort a partition and immediately do something with it that
>>>>> ordering may not be preserved. If you could let us know what you're trying
>>>>> to do with this ordering that would be helpful.
>>>>>
>>>>> - Robert
>>>>>
>>>>>
>>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai 
>>>>> wrote:
>>>>>
>>>>>> Hi Beam users,
>>>>>>
>>>>>> I have a user case to partition my PCollection by some key, and then
>>>>>> sort my rows within the same partition by some other key.
>>>>>>
>>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>>>> figure out how to make it work. Specifically, I tried df.groupby where I
>>>>>> expect my data will be distributed to different nodes. I also tried
>>>>>> df.sort_values, but it will sort my whole dataset, which is not what I 
>>>>>> need.
>>>>>>
>>>>>> Can someone shed some lig

Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-04-07 Thread Wenbing Bai
Hi Robert and Brian,

I tried groupby in my case. Here is my pipeline code. I do see all the data
in the final parquet file are sorted in each group. However, I'd like to
write each partition (group) to an individual file, how can I achieve it?
In addition, I am using the master of Apache Beam SDK, how can I test the
pipeline with DataflowRunner considering there is no dataflow worker image
available?

data = [
{
"key1": 1000 + i % 10,
"key2": randrange(1),
"feature_1": "somestring{}".format(i)
} for i in range(1)
]

class TestRow(typing.NamedTuple):
key1: int
key2: int
feature_1: str

with beam.Pipeline() as p:
pcoll = (
p
| beam.Create(data)
| beam.Map(lambda x:x).with_output_types(TestRow)
)

df = to_dataframe(pcoll)
sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2')
sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
(uuid.uuid4())[:8]), engine='pyarrow', index=False)

On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai 
wrote:

> Thank you, Robert and Brian.
>
> I'd like to try this out. I am trying to distribute my dataset to nodes,
> sort each partition by some key and then store each partition to its own
> file.
>
> Wenbing
>
> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette  wrote:
>
>> Note groupby.apply [1] in particular should be able to do what you want,
>> something like:
>>
>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>
>> But as Robert noted we don't make any guarantees about preserving this
>> ordering later in the pipeline. For this reason I actually just sent a PR
>> to disallow sort_values on the entire dataset [2].
>>
>> Brian
>>
>> [1] https://github.com/apache/beam/pull/13843
>> [2] https://github.com/apache/beam/pull/14324
>>
>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw 
>> wrote:
>>
>>> Thanks for trying this out.
>>>
>>> Better support for groupby (e.g.
>>> https://github.com/apache/beam/pull/13843 ,
>>> https://github.com/apache/beam/pull/13637) will be available in the
>>> next Beam release (2.29, in progress, but you could try out head if you
>>> want). Note, however, that Beam PCollections are by definition unordered,
>>> so unless you sort a partition and immediately do something with it that
>>> ordering may not be preserved. If you could let us know what you're trying
>>> to do with this ordering that would be helpful.
>>>
>>> - Robert
>>>
>>>
>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai 
>>> wrote:
>>>
>>>> Hi Beam users,
>>>>
>>>> I have a user case to partition my PCollection by some key, and then
>>>> sort my rows within the same partition by some other key.
>>>>
>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>> figure out how to make it work. Specifically, I tried df.groupby where I
>>>> expect my data will be distributed to different nodes. I also tried
>>>> df.sort_values, but it will sort my whole dataset, which is not what I 
>>>> need.
>>>>
>>>> Can someone shed some light on this?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Wenbing Bai
>>>>
>>>> Senior Software Engineer
>>>>
>>>> Data Infrastructure, Cruise
>>>>
>>>> Pronouns: She/Her
>>>>
>>>>
>>>>
>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>> information, confidential material, and trade secrets. This message
>>>> may contain some or all of those things. Cruise will suffer material harm
>>>> if anyone other than the intended recipient disseminates or takes any
>>>> action based on this message. If you have received this message (including
>>>> any attachments) in error, please delete it immediately and notify the
>>>> sender promptly.
>>>
>>>
>
> --
>
>
>
>
>
> Wenbing Bai
>
> Senior Software Engineer
>
> Data Infrastructure, Cruise
>
> Pronouns: She/Her
>
>

-- 





Wenbing Bai

Senior Software Engineer

Data Infrastructure, Cruise

Pronouns: She/Her

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.


Re: [EXT] Re: Beam Dataframe - sort and grouping

2021-04-02 Thread Wenbing Bai
Thank you, Robert and Brian.

I'd like to try this out. I am trying to distribute my dataset to nodes,
sort each partition by some key and then store each partition to its own
file.

Wenbing

On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette  wrote:

> Note groupby.apply [1] in particular should be able to do what you want,
> something like:
>
>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>
> But as Robert noted we don't make any guarantees about preserving this
> ordering later in the pipeline. For this reason I actually just sent a PR
> to disallow sort_values on the entire dataset [2].
>
> Brian
>
> [1] https://github.com/apache/beam/pull/13843
> [2] https://github.com/apache/beam/pull/14324
>
> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw 
> wrote:
>
>> Thanks for trying this out.
>>
>> Better support for groupby (e.g.
>> https://github.com/apache/beam/pull/13843 ,
>> https://github.com/apache/beam/pull/13637) will be available in the next
>> Beam release (2.29, in progress, but you could try out head if you want).
>> Note, however, that Beam PCollections are by definition unordered, so
>> unless you sort a partition and immediately do something with it that
>> ordering may not be preserved. If you could let us know what you're trying
>> to do with this ordering that would be helpful.
>>
>> - Robert
>>
>>
>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai 
>> wrote:
>>
>>> Hi Beam users,
>>>
>>> I have a user case to partition my PCollection by some key, and then
>>> sort my rows within the same partition by some other key.
>>>
>>> I feel Beam Dataframe could be a candidate solution, but I cannot figure
>>> out how to make it work. Specifically, I tried df.groupby where I expect my
>>> data will be distributed to different nodes. I also tried df.sort_values,
>>> but it will sort my whole dataset, which is not what I need.
>>>
>>> Can someone shed some light on this?
>>>
>>>
>>>
>>>
>>>
>>> Wenbing Bai
>>>
>>> Senior Software Engineer
>>>
>>> Data Infrastructure, Cruise
>>>
>>> Pronouns: She/Her
>>>
>>>
>>>
>>> *Confidentiality Note:* We care about protecting our proprietary
>>> information, confidential material, and trade secrets. This message may
>>> contain some or all of those things. Cruise will suffer material harm if
>>> anyone other than the intended recipient disseminates or takes any action
>>> based on this message. If you have received this message (including any
>>> attachments) in error, please delete it immediately and notify the sender
>>> promptly.
>>
>>

-- 





Wenbing Bai

Senior Software Engineer

Data Infrastructure, Cruise

Pronouns: She/Her

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.


Beam Dataframe - sort and grouping

2021-04-01 Thread Wenbing Bai
Hi Beam users,

I have a user case to partition my PCollection by some key, and then sort
my rows within the same partition by some other key.

I feel Beam Dataframe could be a candidate solution, but I cannot figure
out how to make it work. Specifically, I tried df.groupby where I expect my
data will be distributed to different nodes. I also tried df.sort_values,
but it will sort my whole dataset, which is not what I need.

Can someone shed some light on this?





Wenbing Bai

Senior Software Engineer

Data Infrastructure, Cruise

Pronouns: She/Her

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.


Help needed on Dataflow worker exception of WriteToBigQuery

2020-02-24 Thread Wenbing Bai
Hi there,

I am using WriteToBigQuery in apache-beam Python SDK 2.16. I get this error
when I run my pipeline in Dataflow Runner.

RuntimeError: IOError: [Errno 2] Not found:
gs://tmp-e3271c8deb2f655-0-of-1.avro [while running
'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']

Anyone who had this before? Can I get any hints on where Dataflow worker
writing data to avro?

-- 





Wenbing Bai

Senior Software Engineer, MLP

Cruise

Pronouns: She/Her

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.