Hi,
TL;DR: newbie question. I have an pyarrow program that uses
pyarrow.compute.diff to compare two 10M row tables. In order to handle the
large volume, I partition my data into subsets by primary key and diff each
one individually. It works but I don't think it performs as well as it
could and I'm wondering if there are better ways to solve this problem
Full question:
I'm new to Arrow and I'm running a proof of concept on using it to find
differences between 2 large data sets. My use case is that we have a job
that dumps some postgres tables to S3 as JSON every night and I want to run
a batch job that compares one day's data and the next so we can send the
change sets to downstream systems in other parts of the company. Some of
the tables have over 30M rows of data (and growing) so I need something
that is performant and can handle a large volume of data.
For my proof of concept, I downloaded 10M rows of one of the tables from 1
day and then another 10M from the next day. I then made separate "small"
(100,000 rows) and "medium" (1M rows) sized subsets for development. If
this exercise is successful and we decide to go ahead with a project using
Arrow, it will probably be written in Ruby because that's the main language
at my company. I used Python for the proof of concept because it seems
better documented and more widely used so it should be easier to find
documentation etc.
My first attempt was to load the data into Arrow tables and the use
compute.diff to find the differences. This worked find for the small and
medium sized data sets but when I ran it against the "large" (10M) row data
my program failed with an error:
There are more than 2^32 bytes of key data. Acero cannot process a join of
this magnitude
This is obviously a problem so I decided to partition the data in subsets
based on the primary key like this:
def partition_by_key(table, key, nways):
partitions = [[] for i in range(0, nways)]
# Slice up key values into nways partitions
for idx, val in enumerate(table.column(key)):
ptn = val.as_py() % nways
partitions[ptn].append(idx)
# Use the key partitions to create nways
# masks over the data
result = []
for idx, indexes in enumerate(partitions):
mask = [False for i in range(0, len(table))]
for i in indexes:
mask[i] = True
result.append(table.filter(mask))
return result
And then diff each partition in turn like this
def diff_partitions(lhs_parts, rhs_parts, primary_key,
find_changes_expression):
inserted = []
changed = []
deleted = []
for lhs, rhs in zip(lhs_parts, rhs_parts):
i, c, d = compute.diff(lhs, rhs, primary_key,
find_changes_expression)
inserted.append(i)
changed.append(c)
deleted.append(d)
return inserted, changed, deleted
So the main part of my program reads:
# utility function that loads all the JSONL
# files in a directory into an Arrow table
lhs = jsonl.load_jsonl(lhs_dir, schema)
rhs = jsonl.load_jsonl(rhs_dir, schema)
num_rows = max(len(lhs), len(rhs))
nways = multiprocessing.cpu_count()
lhs_parts = partition_by_key(lhs, primary_key, nways)
rhs_parts = partition_by_key(rhs, primary_key, nways)
# find_changes_expression is hard-coded elsewhere in the program
inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts,
primary_key, find_changes_expression)
This works but I have some questions:
Have I used Arrow in the best possible way here? Or are there better ways
to approach this problem?
One thing I don't like is that am running compute.diff against each data
partition one after another. I can see I have plenty of spare CPU capacity
when this code is running so I'd like to be able to diff all the partitions
concurrently. Does Arrow provide any support for true multithreading so I
can get around Python's limited threading capabilities?
In general, I've found the docs to be good at explaining how Arrow works
but I couldn't find much about how to "think in arrow" when solving
problems. Is there anything out there I might want to look at?
Many Thanks
Adrian