cbind? yeah though the answer is typically a join. I don't know if there's a better option in a SQL engine, as SQL doesn't have anything to offer except join and pivot either (? right?) Certainly, the dominant data storage paradigm is wide tables, whereas you're starting with effectively a huge number of tiny slim tables, which is the impedance mismatch here.
On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson <aedav...@ucsc.edu> wrote: > Thanks Sean > > > > I imagine this is a fairly common problem in data science. Any idea how > other solve? For example I wonder if running join something like BigQuery > might work better? I do not know much about the implementation. > > > > No one tool will solve all problems. Once I get the matrix I think it > spark will work well for our need > > > > Kind regards > > > > Andy > > > > *From: *Sean Owen <sro...@gmail.com> > *Date: *Monday, April 18, 2022 at 6:58 PM > *To: *Andrew Davidson <aedav...@ucsc.edu> > *Cc: *"user @spark" <user@spark.apache.org> > *Subject: *Re: How is union() implemented? Need to implement column bind > > > > A join is the natural answer, but this is a 10114-way join, which probably > chokes readily just to even plan it, let alone all the shuffling and > shuffling of huge data. You could tune your way out of it maybe, but not > optimistic. It's just huge. > > > > You could go off-road and lower-level to take advantage of the structure > of the data. You effectively want "column bind". There is no such operation > in Spark. (union is 'row bind'.) You could do this with zipPartition, which > is in the RDD API, and to my surprise, not in the Python API but exists in > Scala. And R (!). If you can read several RDDs of data, you can use this > method to pair all their corresponding values and ultimately get rows of > 10114 values out. In fact that is how sparklyr implements cbind on Spark, > FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html > > > > The issue I see is that you can only zip a few at a time; you don't want > to zip 10114 of them. Perhaps you have to do that iteratively, and I don't > know if that is going to face the same issues with huge huge plans. > > > > I like the pivot idea. If you can read the individual files as data rows > (maybe list all the file names, parallelize with Spark, write a UDF that > reads the data for that file to generate the rows). If you can emit (file, > index, value) and groupBy index, pivot on file (I think?) that should be > about it? I think it doesn't need additional hashing or whatever. Not sure > how fast it is but that seems more direct than the join, as well. > > > > On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson <aedav...@ucsc.edu.invalid> > wrote: > > Hi have a hard problem > > > > I have 10114 column vectors each in a separate file. The file has 2 > columns, the row id, and numeric values. The row ids are identical and in > sort order. All the column vectors have the same number of rows. There are > over 5 million rows. I need to combine them into a single table. The row > ids are very long strings. The column names are about 20 chars long. > > > > My current implementation uses join. This takes a long time on a cluster > with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I > mean totally dead start over. Checkpoints do not seem help, It still > crashes and need to be restarted from scratch. What is really surprising > is the final file size is only 213G ! The way got the file was to copy > all the column vectors to a single BIG IRON machine and used unix cut and > paste. Took about 44 min to run once I got all the data moved around. It > was very tedious and error prone. I had to move a lot data around. Not a > particularly reproducible process. I will need to rerun this three more > times on different data sets of about the same size > > > > I noticed that spark has a union function(). It implements row bind. Any > idea how it is implemented? Is it just map reduce under the covers? > > > > My thought was > > 1. load each col vector > > 2. maybe I need to replace the really long row id strings with > integers > > 3. convert column vectors into row vectors using piviot (Ie matrix > transpose.) > > 4. union all the row vectors into a single table > > 5. piviot the table back so I have the correct column vectors > > > > I could replace the row ids and column name with integers if needed, and > restore them later > > > > Maybe I would be better off using many small machines? I assume memory is > the limiting resource not cpu. I notice that memory usage will reach 100%. > I added several TB’s of local ssd. I am not convinced that spark is using > the local disk > > > > > > will this perform better than join? > > > > · The rows before the final pivot will be very very wide (over 5 million > columns) > > · There will only be 10114 rows before the pivot > > > > I assume the pivots will shuffle all the data. I assume the Colum vectors > are trivial. The file table pivot will be expensive however will only need > to be done once > > > > > > > > Comments and suggestions appreciated > > > > Andy > > > > > >