Thanks Sean

I imagine this is a fairly common problem in data science. Any idea how other 
solve?  For example I wonder if running join something like BigQuery might work 
better? I do not know much about the implementation.

No one tool will  solve all problems. Once I get the matrix I think it spark 
will work well for our need

Kind regards

Andy

From: Sean Owen <sro...@gmail.com>
Date: Monday, April 18, 2022 at 6:58 PM
To: Andrew Davidson <aedav...@ucsc.edu>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: How is union() implemented? Need to implement column bind

A join is the natural answer, but this is a 10114-way join, which probably 
chokes readily just to even plan it, let alone all the shuffling and shuffling 
of huge data. You could tune your way out of it maybe, but not optimistic. It's 
just huge.

You could go off-road and lower-level to take advantage of the structure of the 
data. You effectively want "column bind". There is no such operation in Spark. 
(union is 'row bind'.) You could do this with zipPartition, which is in the RDD 
API, and to my surprise, not in the Python API but exists in Scala. And R (!). 
If you can read several RDDs of data, you can use this method to pair all their 
corresponding values and ultimately get rows of 10114 values out. In fact that 
is how sparklyr implements cbind on Spark, FWIW: 
https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to zip 
10114 of them. Perhaps you have to do that iteratively, and I don't know if 
that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows (maybe 
list all the file names, parallelize with Spark, write a UDF that reads the 
data for that file to generate the rows). If you can emit (file, index, value) 
and groupBy index, pivot on file (I think?) that should be about it? I think it 
doesn't need additional hashing or whatever. Not sure how fast it is but that 
seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson <aedav...@ucsc.edu.invalid> 
wrote:
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 
size

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

1.      load each col vector

2.      maybe I need to replace the really long row id strings with integers

3.      convert column vectors into row vectors using piviot (Ie matrix 
transpose.)

4.      union all the row vectors into a single table

5.      piviot the table back so I have the correct column vectors


I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk


will this perform better than join?


· The rows  before the final pivot will be very very wide (over 5 million 
columns)

· There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once



Comments and suggestions appreciated

Andy


Reply via email to