Ensuring a task does not get executed concurrently

2023-06-12 Thread Stephan Hoyer via dev
Can the Beam data model (specifically the Python SDK) support executing functions that are idempotent but not concurrency-safe? I am thinking of a task like setting up a database (or in my case, a Zarr store in Xarray-Beam ) where it is

Re: Hierarchical fanout with Beam combiners?

2023-05-27 Thread Stephan Hoyer via dev
=30, which means combining up to 3 GB of data on a single machine. This is probably fine but doesn't leave a large machine for error. > On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev < > dev@beam.apache.org> wrote: > >> We have some use-cases where we are combining ov

Hierarchical fanout with Beam combiners?

2023-05-26 Thread Stephan Hoyer via dev
We have some use-cases where we are combining over very large sets (e.g., computing the average of 1e5 to 1e6 elements, corresponding to hourly weather observations over the past 50 years). "with_hot_key_fanout" seems to be rather essential for performing these calculations, but as far as I can

beam.Create(range(N)) without building a sequence in memory

2022-09-19 Thread Stephan Hoyer via dev
Many of my Beam pipelines start with partitioning over some large, statically known number of inputs that could be created from a list of sequential integers. In Python, these sequential integers can be efficiently represented with a range() object, which stores the start/top and interval.

Re: Cartesian product of PCollections

2022-09-19 Thread Stephan Hoyer via dev
> > > > My team has an internal implementation of a CartesianProduct > transform, based on using hashing to split a pcollection into a finite > number of groups and CoGroupByKey. > > > > Could this be contributed to Beam? > If it would be of broader interest, I would be happy to work on this for

Cartesian product of PCollections

2022-09-19 Thread Stephan Hoyer via dev
I'm wondering if it would make sense to have a built-in Beam transformation for calculating the Cartesian product of PCollections. Just this past week, I've encountered two separate cases where calculating a Cartesian product was a bottleneck. The in-memory option of using something like Python's

Re: Consider Cloudpickle instead of dill for Python pickling

2021-06-08 Thread Stephan Hoyer
; >>> However, I have a better idea - why don't you simply vendor-in either >>> `dill` or `cloudpickle` (I am not sure which one is best) ? >>> >>> Since you are not planning to upgrade it often (that's the whole point >>> of narrow versioning), you c

Re: Out of band pickling in Python (pickle5)

2021-05-27 Thread Stephan Hoyer
st >> collections of numpy arrays. >> >> Brian >> >> [1] https://github.com/pandas-dev/pandas/issues/34244 >> >> On Tue, May 25, 2021 at 2:59 PM Stephan Hoyer wrote: >> >>> Beam's PickleCoder would need to be updated to pass the >>> "bu

Re: Out of band pickling in Python (pickle5)

2021-05-25 Thread Stephan Hoyer
orms in my case, but I would have very much appreciated faster >> serialization performance. >> >> Thanks, >> Evan >> >> On Tue, May 25, 2021 at 15:26 Stephan Hoyer wrote: >> >>> Has anyone looked into out of band pickling for Beam's Python SDK, i.

Out of band pickling in Python (pickle5)

2021-05-25 Thread Stephan Hoyer
Has anyone looked into out of band pickling for Beam's Python SDK, i.e., Pickle protocol version 5? https://www.python.org/dev/peps/pep-0574/ https://docs.python.org/3/library/pickle.html#out-of-band-buffers For Beam pipelines passing around NumPy arrays (or collections of NumPy arrays, like

Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
y_beam.Rechunk() transform includes a few GroupByKey transforms inside and definitely cannot operate in-memory. On Mon, May 24, 2021 at 4:12 PM Reuven Lax wrote: > Can you explain a bit more? Where are these data sets coming from? > > On Mon, May 24, 2021 at 3:55 PM Stephan Hoyer wro

Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
such datasets (ideally without needing to know that number 20 ahead of time). On Mon, May 24, 2021 at 3:30 PM Reuven Lax wrote: > Is the issue that you have a different topology depending on the key? > > On Mon, May 24, 2021 at 2:49 PM Stephan Hoyer wrote: > >> Exactly, my us

Re: Apply a Beam PTransform per key

2021-05-24 Thread Stephan Hoyer
t; implicitly rather than just a parameter to GBK - that would require all >>> transforms to expose how they use GBK under the hood and they would all >>> have to plumb this extra key/WindowFn through every API. Instead, we have >>> this way to implicitly add a second key to an

Apply a Beam PTransform per key

2021-05-21 Thread Stephan Hoyer
I'd like to write a Beam PTransform that applies an *existing* Beam transform to each set of grouped values, separately, and combines the result. Is anything like this possible with Beam using the Python SDK? Here are the closest things I've come up with: 1. If each set of *inputs* to my

Re: Proposal: Generalize S3FileSystem

2021-05-20 Thread Stephan Hoyer
On Thu, May 20, 2021 at 10:12 AM Chad Dombrova wrote: > Hi Brian, > I think the main goal would be to make a python package that could be pip > installed independently of apache_beam. That goal could be accomplished > with option 3, thus preserving all of the benefits of a monorepo. If it >

Re: Transform-specific thread pools in Python

2021-05-11 Thread Stephan Hoyer
On Mon, May 10, 2021 at 4:28 PM Ahmet Altay wrote: > > > On Mon, May 10, 2021 at 8:01 AM Stephan Hoyer wrote: > >> Hi Beam devs, >> >> I've been exploring recently how to optimize IO bound steps for my Python >> Beam pipelines, and have come up with a solut

Transform-specific thread pools in Python

2021-05-10 Thread Stephan Hoyer
Hi Beam devs, I've been exploring recently how to optimize IO bound steps for my Python Beam pipelines, and have come up with a solution that I think might make sense to upstream into Beam's Python SDK. It appears that Beam runners (at least the Cloud Dataflow runner) typically use only a single

Python API reference docs could use better organization

2021-04-30 Thread Stephan Hoyer
(Note: I also filed this as a JIRA [1] a few days ago, but I noticed that the mailing list seems to be a better place for opening discussions.) I've been enjoying diving into Beam recently, but to my frustration I've found that I often need to look through the source code to discover APIs. Beam

Re: Consider Cloudpickle instead of dill for Python pickling

2021-04-30 Thread Stephan Hoyer
h cloudpickle we will not be able have a tight range. We > could solve this problem by passing the version of pickler used at job > submission, and have a check on the runner to make sure that the client > version is not newer than the runner's version. Additionally, we should > make

Consider Cloudpickle instead of dill for Python pickling

2021-04-29 Thread Stephan Hoyer
cloudpickle [1] and dill [2] are two Python packages that implement extensions of Python's pickle protocol for arbitrary objects. Beam currently uses dill, but I'm wondering if we could consider additionally or alternatively use cloudpickle instead. Overall, cloudpickle seems to be a more popular