
If you have any influence over how data is dumped from postgres  - my
suggestion is to have it already partitioned then.

This would make parallelization much easier,



pt., 7 lip 2023 o 12:21 Adrian Mowat <adr...@altmetric.com> napisaƂ(a):
> Hi,
> TL;DR: newbie question.  I have an pyarrow program that uses 
> pyarrow.compute.diff to compare two 10M row tables.  In order to handle the 
> large volume, I partition my data into subsets by primary key and diff each 
> one individually.  It works but I don't think it performs as well as it could 
> and I'm wondering if there are better ways to solve this problem
> Full question:
> I'm new to Arrow and I'm running a proof of concept on using it to find 
> differences between 2 large data sets.  My use case is that we have a job 
> that dumps some postgres tables to S3 as JSON every night and I want to run a 
> batch job that compares one day's data and the next so we can send the change 
> sets to downstream systems in other parts of the company.  Some of the tables 
> have over 30M rows of data (and growing) so I need something that is 
> performant and can handle a large volume of data.
> For my proof of concept, I downloaded 10M rows of one of the tables from 1 
> day and then another 10M from the next day.  I then made separate "small" 
> (100,000 rows) and "medium" (1M rows) sized subsets for development.  If this 
> exercise is successful and we decide to go ahead with a project using Arrow, 
> it will probably be written in Ruby because that's the main language at my 
> company.  I used Python for the proof of concept because it seems better 
> documented and more widely used so it should be easier to find documentation 
> etc.
> My first attempt was to load the data into Arrow tables and the use 
> compute.diff to find the differences.  This worked find for the small and 
> medium sized data sets but when I ran it against the "large" (10M) row data 
> my program failed with an error:
> There are more than 2^32 bytes of key data.  Acero cannot process a join of 
> this magnitude
> This is obviously a problem so I decided to partition the data in subsets 
> based on the primary key like this:
> def partition_by_key(table, key, nways):
>     partitions = [[] for i in range(0, nways)]
>     # Slice up key values into nways partitions
>     for idx, val in enumerate(table.column(key)):
>         ptn = val.as_py() % nways
>         partitions[ptn].append(idx)
>     # Use the key partitions to create nways
>     # masks over the data
>     result = []
>     for idx, indexes in enumerate(partitions):
>         mask = [False for i in range(0, len(table))]
>         for i in indexes:
>             mask[i] = True
>         result.append(table.filter(mask))
>     return result
> And then diff each partition in turn like this
> def diff_partitions(lhs_parts, rhs_parts, primary_key, 
> find_changes_expression):
>     inserted = []
>     changed = []
>     deleted = []
>     for lhs, rhs in zip(lhs_parts, rhs_parts):
>         i, c, d = compute.diff(lhs, rhs, primary_key, find_changes_expression)
>         inserted.append(i)
>         changed.append(c)
>         deleted.append(d)
>     return inserted, changed, deleted
> So the main part of my program reads:
> # utility function that loads all the JSONL
> # files in a directory into an Arrow table
> lhs = jsonl.load_jsonl(lhs_dir, schema)
> rhs = jsonl.load_jsonl(rhs_dir, schema)
> num_rows = max(len(lhs), len(rhs))
> nways = multiprocessing.cpu_count()
> lhs_parts = partition_by_key(lhs, primary_key, nways)
> rhs_parts = partition_by_key(rhs, primary_key, nways)
> # find_changes_expression is hard-coded elsewhere in the program
> inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts, 
> primary_key, find_changes_expression)
> This works but I have some questions:
> Have I used Arrow in the best possible way here?  Or are there better ways to 
> approach this problem?
> One thing I don't like is that am running compute.diff against each data 
> partition one after another.  I can see I have plenty of spare CPU capacity 
> when this code is running so I'd like to be able to diff all the partitions 
> concurrently.  Does Arrow provide any support for true multithreading so I 
> can get around Python's limited threading capabilities?
> In general, I've found the docs to be good at explaining how Arrow works but 
> I couldn't find much about how to "think in arrow" when solving problems.  Is 
> there anything out there I might want to look at?
> Many Thanks
> Adrian

Reply via email to