Hi! If you have any influence over how data is dumped from postgres - my suggestion is to have it already partitioned then.
This would make parallelization much easier, BR Jacek pt., 7 lip 2023 o 12:21 Adrian Mowat <adr...@altmetric.com> napisaĆ(a): > > Hi, > > TL;DR: newbie question. I have an pyarrow program that uses > pyarrow.compute.diff to compare two 10M row tables. In order to handle the > large volume, I partition my data into subsets by primary key and diff each > one individually. It works but I don't think it performs as well as it could > and I'm wondering if there are better ways to solve this problem > > Full question: > > I'm new to Arrow and I'm running a proof of concept on using it to find > differences between 2 large data sets. My use case is that we have a job > that dumps some postgres tables to S3 as JSON every night and I want to run a > batch job that compares one day's data and the next so we can send the change > sets to downstream systems in other parts of the company. Some of the tables > have over 30M rows of data (and growing) so I need something that is > performant and can handle a large volume of data. > > For my proof of concept, I downloaded 10M rows of one of the tables from 1 > day and then another 10M from the next day. I then made separate "small" > (100,000 rows) and "medium" (1M rows) sized subsets for development. If this > exercise is successful and we decide to go ahead with a project using Arrow, > it will probably be written in Ruby because that's the main language at my > company. I used Python for the proof of concept because it seems better > documented and more widely used so it should be easier to find documentation > etc. > > My first attempt was to load the data into Arrow tables and the use > compute.diff to find the differences. This worked find for the small and > medium sized data sets but when I ran it against the "large" (10M) row data > my program failed with an error: > > There are more than 2^32 bytes of key data. Acero cannot process a join of > this magnitude > > This is obviously a problem so I decided to partition the data in subsets > based on the primary key like this: > > def partition_by_key(table, key, nways): > partitions = [[] for i in range(0, nways)] > > # Slice up key values into nways partitions > for idx, val in enumerate(table.column(key)): > ptn = val.as_py() % nways > partitions[ptn].append(idx) > > # Use the key partitions to create nways > # masks over the data > result = [] > for idx, indexes in enumerate(partitions): > mask = [False for i in range(0, len(table))] > for i in indexes: > mask[i] = True > result.append(table.filter(mask)) > > return result > > And then diff each partition in turn like this > > def diff_partitions(lhs_parts, rhs_parts, primary_key, > find_changes_expression): > inserted = [] > changed = [] > deleted = [] > for lhs, rhs in zip(lhs_parts, rhs_parts): > i, c, d = compute.diff(lhs, rhs, primary_key, find_changes_expression) > inserted.append(i) > changed.append(c) > deleted.append(d) > return inserted, changed, deleted > > So the main part of my program reads: > > # utility function that loads all the JSONL > # files in a directory into an Arrow table > lhs = jsonl.load_jsonl(lhs_dir, schema) > rhs = jsonl.load_jsonl(rhs_dir, schema) > > num_rows = max(len(lhs), len(rhs)) > nways = multiprocessing.cpu_count() > > lhs_parts = partition_by_key(lhs, primary_key, nways) > rhs_parts = partition_by_key(rhs, primary_key, nways) > > # find_changes_expression is hard-coded elsewhere in the program > inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts, > primary_key, find_changes_expression) > > This works but I have some questions: > > Have I used Arrow in the best possible way here? Or are there better ways to > approach this problem? > > One thing I don't like is that am running compute.diff against each data > partition one after another. I can see I have plenty of spare CPU capacity > when this code is running so I'd like to be able to diff all the partitions > concurrently. Does Arrow provide any support for true multithreading so I > can get around Python's limited threading capabilities? > > In general, I've found the docs to be good at explaining how Arrow works but > I couldn't find much about how to "think in arrow" when solving problems. Is > there anything out there I might want to look at? > > Many Thanks > > Adrian > > > > > > > >