Hi Jacek, Yes, I would be able to re-write the extracts from Postgres. Is there an easy way to partition the results of a SQL query or would I need to write something?
Many thanks Adrian On Fri, 7 Jul 2023 at 12:53, Jacek Pliszka <[email protected]> wrote: > Hi! > > If you have any influence over how data is dumped from postgres - my > suggestion is to have it already partitioned then. > > This would make parallelization much easier, > > BR > > Jacek > > pt., 7 lip 2023 o 12:21 Adrian Mowat <[email protected]> napisaĆ(a): > > > > Hi, > > > > TL;DR: newbie question. I have an pyarrow program that uses > pyarrow.compute.diff to compare two 10M row tables. In order to handle the > large volume, I partition my data into subsets by primary key and diff each > one individually. It works but I don't think it performs as well as it > could and I'm wondering if there are better ways to solve this problem > > > > Full question: > > > > I'm new to Arrow and I'm running a proof of concept on using it to find > differences between 2 large data sets. My use case is that we have a job > that dumps some postgres tables to S3 as JSON every night and I want to run > a batch job that compares one day's data and the next so we can send the > change sets to downstream systems in other parts of the company. Some of > the tables have over 30M rows of data (and growing) so I need something > that is performant and can handle a large volume of data. > > > > For my proof of concept, I downloaded 10M rows of one of the tables from > 1 day and then another 10M from the next day. I then made separate "small" > (100,000 rows) and "medium" (1M rows) sized subsets for development. If > this exercise is successful and we decide to go ahead with a project using > Arrow, it will probably be written in Ruby because that's the main language > at my company. I used Python for the proof of concept because it seems > better documented and more widely used so it should be easier to find > documentation etc. > > > > My first attempt was to load the data into Arrow tables and the use > compute.diff to find the differences. This worked find for the small and > medium sized data sets but when I ran it against the "large" (10M) row data > my program failed with an error: > > > > There are more than 2^32 bytes of key data. Acero cannot process a join > of this magnitude > > > > This is obviously a problem so I decided to partition the data in > subsets based on the primary key like this: > > > > def partition_by_key(table, key, nways): > > partitions = [[] for i in range(0, nways)] > > > > # Slice up key values into nways partitions > > for idx, val in enumerate(table.column(key)): > > ptn = val.as_py() % nways > > partitions[ptn].append(idx) > > > > # Use the key partitions to create nways > > # masks over the data > > result = [] > > for idx, indexes in enumerate(partitions): > > mask = [False for i in range(0, len(table))] > > for i in indexes: > > mask[i] = True > > result.append(table.filter(mask)) > > > > return result > > > > And then diff each partition in turn like this > > > > def diff_partitions(lhs_parts, rhs_parts, primary_key, > find_changes_expression): > > inserted = [] > > changed = [] > > deleted = [] > > for lhs, rhs in zip(lhs_parts, rhs_parts): > > i, c, d = compute.diff(lhs, rhs, primary_key, > find_changes_expression) > > inserted.append(i) > > changed.append(c) > > deleted.append(d) > > return inserted, changed, deleted > > > > So the main part of my program reads: > > > > # utility function that loads all the JSONL > > # files in a directory into an Arrow table > > lhs = jsonl.load_jsonl(lhs_dir, schema) > > rhs = jsonl.load_jsonl(rhs_dir, schema) > > > > num_rows = max(len(lhs), len(rhs)) > > nways = multiprocessing.cpu_count() > > > > lhs_parts = partition_by_key(lhs, primary_key, nways) > > rhs_parts = partition_by_key(rhs, primary_key, nways) > > > > # find_changes_expression is hard-coded elsewhere in the program > > inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts, > primary_key, find_changes_expression) > > > > This works but I have some questions: > > > > Have I used Arrow in the best possible way here? Or are there better > ways to approach this problem? > > > > One thing I don't like is that am running compute.diff against each data > partition one after another. I can see I have plenty of spare CPU capacity > when this code is running so I'd like to be able to diff all the partitions > concurrently. Does Arrow provide any support for true multithreading so I > can get around Python's limited threading capabilities? > > > > In general, I've found the docs to be good at explaining how Arrow works > but I couldn't find much about how to "think in arrow" when solving > problems. Is there anything out there I might want to look at? > > > > Many Thanks > > > > Adrian > > > > > > > > > > > > > > > > >
