Hi Jacek,

Yes, I would be able to re-write the extracts from Postgres.  Is there an
easy way to partition the results of a SQL query or would I need to write
something?

Many thanks

Adrian

On Fri, 7 Jul 2023 at 12:53, Jacek Pliszka <[email protected]> wrote:

> Hi!
>
> If you have any influence over how data is dumped from postgres  - my
> suggestion is to have it already partitioned then.
>
> This would make parallelization much easier,
>
> BR
>
> Jacek
>
> pt., 7 lip 2023 o 12:21 Adrian Mowat <[email protected]> napisaƂ(a):
> >
> > Hi,
> >
> > TL;DR: newbie question.  I have an pyarrow program that uses
> pyarrow.compute.diff to compare two 10M row tables.  In order to handle the
> large volume, I partition my data into subsets by primary key and diff each
> one individually.  It works but I don't think it performs as well as it
> could and I'm wondering if there are better ways to solve this problem
> >
> > Full question:
> >
> > I'm new to Arrow and I'm running a proof of concept on using it to find
> differences between 2 large data sets.  My use case is that we have a job
> that dumps some postgres tables to S3 as JSON every night and I want to run
> a batch job that compares one day's data and the next so we can send the
> change sets to downstream systems in other parts of the company.  Some of
> the tables have over 30M rows of data (and growing) so I need something
> that is performant and can handle a large volume of data.
> >
> > For my proof of concept, I downloaded 10M rows of one of the tables from
> 1 day and then another 10M from the next day.  I then made separate "small"
> (100,000 rows) and "medium" (1M rows) sized subsets for development.  If
> this exercise is successful and we decide to go ahead with a project using
> Arrow, it will probably be written in Ruby because that's the main language
> at my company.  I used Python for the proof of concept because it seems
> better documented and more widely used so it should be easier to find
> documentation etc.
> >
> > My first attempt was to load the data into Arrow tables and the use
> compute.diff to find the differences.  This worked find for the small and
> medium sized data sets but when I ran it against the "large" (10M) row data
> my program failed with an error:
> >
> > There are more than 2^32 bytes of key data.  Acero cannot process a join
> of this magnitude
> >
> > This is obviously a problem so I decided to partition the data in
> subsets based on the primary key like this:
> >
> > def partition_by_key(table, key, nways):
> >     partitions = [[] for i in range(0, nways)]
> >
> >     # Slice up key values into nways partitions
> >     for idx, val in enumerate(table.column(key)):
> >         ptn = val.as_py() % nways
> >         partitions[ptn].append(idx)
> >
> >     # Use the key partitions to create nways
> >     # masks over the data
> >     result = []
> >     for idx, indexes in enumerate(partitions):
> >         mask = [False for i in range(0, len(table))]
> >         for i in indexes:
> >             mask[i] = True
> >         result.append(table.filter(mask))
> >
> >     return result
> >
> > And then diff each partition in turn like this
> >
> > def diff_partitions(lhs_parts, rhs_parts, primary_key,
> find_changes_expression):
> >     inserted = []
> >     changed = []
> >     deleted = []
> >     for lhs, rhs in zip(lhs_parts, rhs_parts):
> >         i, c, d = compute.diff(lhs, rhs, primary_key,
> find_changes_expression)
> >         inserted.append(i)
> >         changed.append(c)
> >         deleted.append(d)
> >     return inserted, changed, deleted
> >
> > So the main part of my program reads:
> >
> > # utility function that loads all the JSONL
> > # files in a directory into an Arrow table
> > lhs = jsonl.load_jsonl(lhs_dir, schema)
> > rhs = jsonl.load_jsonl(rhs_dir, schema)
> >
> > num_rows = max(len(lhs), len(rhs))
> > nways = multiprocessing.cpu_count()
> >
> > lhs_parts = partition_by_key(lhs, primary_key, nways)
> > rhs_parts = partition_by_key(rhs, primary_key, nways)
> >
> > # find_changes_expression is hard-coded elsewhere in the program
> > inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts,
> primary_key, find_changes_expression)
> >
> > This works but I have some questions:
> >
> > Have I used Arrow in the best possible way here?  Or are there better
> ways to approach this problem?
> >
> > One thing I don't like is that am running compute.diff against each data
> partition one after another.  I can see I have plenty of spare CPU capacity
> when this code is running so I'd like to be able to diff all the partitions
> concurrently.  Does Arrow provide any support for true multithreading so I
> can get around Python's limited threading capabilities?
> >
> > In general, I've found the docs to be good at explaining how Arrow works
> but I couldn't find much about how to "think in arrow" when solving
> problems.  Is there anything out there I might want to look at?
> >
> > Many Thanks
> >
> > Adrian
> >
> >
> >
> >
> >
> >
> >
> >
>

Reply via email to