Re: How is union() implemented? Need to implement column bind
Not a max - all values are needed. pivot() if anything is much closer, but see the rest of this thread. On Thu, Apr 21, 2022 at 1:19 AM Sonal Goyal wrote: > Seems like an interesting problem to solve! > > If I have understood it correctly, you have 10114 files each with the > structure > > rowid, colA > r1, a > r2, b > r3, c > ...5 million rows > > if you union them, you will have > rowid, colA, colB > r1, a, null > r2, b, null > r3, c, null > r1, null, d > r2, null, e > r3, null, f > > Will a window partition by rowid and max on column values not work ? > > Cheers, > Sonal > https://github.com/zinggAI/zingg > > > > On Thu, Apr 21, 2022 at 6:50 AM Sean Owen wrote: > >> Oh, Spark directly supports upserts (with the right data destination) and >> yeah you could do this as 1+ updates to a table without any pivoting, >> etc. It'd still end up being 10K+ single joins along the way but individual >> steps are simpler. It might actually be pretty efficient I/O wise as >> columnar formats would not rewrite any other data on a write like this. >> >> On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson >> wrote: >> >>> Hi Sean >>> >>> >>> >>> My “insert” solution is hack that might work give we can easily spin up >>> a single VM with a crazy amouts of memory. I would prefer to see a >>> distributed solution. It is just a matter of time before someone want to >>> create an even bigger table using cbind. >>> >>> >>> >>> I understand you probably already know a lot about traditional RDBS’s. >>> Much of my post is back ground for others >>> >>> >>> >>> I used to do some of classic relational database work before tools like >>> Hadoop, spark and NoSQL became available . >>> >>> >>> >>> The standard operations on a single table in a relation database are >>> >>> >>> >>> Insert “row”. This is similar to spark union. Typically primary keys in >>> in rbdms tables are indexed to enable quick look up. So insert is >>> probably not 1 for. 1 with union. The row may not simply be appended to the >>> end of the table. >>> >>> >>> >>> Update a “row” >>> >>> Delete a “row” >>> >>> Select “rows where” >>> >>> >>> >>> Rdms server enable row and table level locking. Data must always be in a >>> consistent state. You must commit or abort you changes for them to persist >>> and to release locks on the data. Locks are required because you have a >>> single resource and may user requesting service simultaneously. This is >>> very different from Spark >>> >>> >>> >>> Storage and memory used to be really expensive so often people tried to >>> create “1st normal form” schemas. I.E. no duplicate data to reduce >>> hardware cost. 1st normal design require you to use joins to the get >>> data table you want. Joins are expensive. Often design duplicated some data >>> to improve performance by minimize the number of joins required. Duplicate >>> data make maintaining consistency harder. There are other advantages to >>> normalized data design and as we are all aware in the bigdata world lots of >>> disadvantages. The dbms ran on a single big machine. Join was not >>> implemented as distributed map/reduce. >>> >>> >>> >>> So My idea is use a traditional RDMS server: my final table will have 5 >>> million rows and 10,114 columns. >>> >>>1. Read the column vector from each of 10,114 data files >>>2. insert the column vector as a row in the table >>> 1. I read a file that has a single element on each line. All I >>> need to do is replace \n with , >>>3. Now I have table with 10,115 rows and 5 million columns >>>4. The row id (primary key) is the original file name >>>5. The columns are the row ids in the original column vectors >>>6. Now all I need to do is pivot this single table to get what I >>>want. This is the only join or map/reduce like operation >>>7. A table with 5million rows and 10,114 columns >>> >>> >>> >>> >>> >>> My final table is about 220 gb. I know at google my I have quota for up >>> 2 mega mem machines. Each one has some think like 1.4 Tb of memory >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >>> >>>
Re: How is union() implemented? Need to implement column bind
Seems like an interesting problem to solve! If I have understood it correctly, you have 10114 files each with the structure rowid, colA r1, a r2, b r3, c ...5 million rows if you union them, you will have rowid, colA, colB r1, a, null r2, b, null r3, c, null r1, null, d r2, null, e r3, null, f Will a window partition by rowid and max on column values not work ? Cheers, Sonal https://github.com/zinggAI/zingg On Thu, Apr 21, 2022 at 6:50 AM Sean Owen wrote: > Oh, Spark directly supports upserts (with the right data destination) and > yeah you could do this as 1+ updates to a table without any pivoting, > etc. It'd still end up being 10K+ single joins along the way but individual > steps are simpler. It might actually be pretty efficient I/O wise as > columnar formats would not rewrite any other data on a write like this. > > On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson wrote: > >> Hi Sean >> >> >> >> My “insert” solution is hack that might work give we can easily spin up a >> single VM with a crazy amouts of memory. I would prefer to see a >> distributed solution. It is just a matter of time before someone want to >> create an even bigger table using cbind. >> >> >> >> I understand you probably already know a lot about traditional RDBS’s. >> Much of my post is back ground for others >> >> >> >> I used to do some of classic relational database work before tools like >> Hadoop, spark and NoSQL became available . >> >> >> >> The standard operations on a single table in a relation database are >> >> >> >> Insert “row”. This is similar to spark union. Typically primary keys in >> in rbdms tables are indexed to enable quick look up. So insert is >> probably not 1 for. 1 with union. The row may not simply be appended to the >> end of the table. >> >> >> >> Update a “row” >> >> Delete a “row” >> >> Select “rows where” >> >> >> >> Rdms server enable row and table level locking. Data must always be in a >> consistent state. You must commit or abort you changes for them to persist >> and to release locks on the data. Locks are required because you have a >> single resource and may user requesting service simultaneously. This is >> very different from Spark >> >> >> >> Storage and memory used to be really expensive so often people tried to >> create “1st normal form” schemas. I.E. no duplicate data to reduce >> hardware cost. 1st normal design require you to use joins to the get >> data table you want. Joins are expensive. Often design duplicated some data >> to improve performance by minimize the number of joins required. Duplicate >> data make maintaining consistency harder. There are other advantages to >> normalized data design and as we are all aware in the bigdata world lots of >> disadvantages. The dbms ran on a single big machine. Join was not >> implemented as distributed map/reduce. >> >> >> >> So My idea is use a traditional RDMS server: my final table will have 5 >> million rows and 10,114 columns. >> >>1. Read the column vector from each of 10,114 data files >>2. insert the column vector as a row in the table >> 1. I read a file that has a single element on each line. All I >> need to do is replace \n with , >>3. Now I have table with 10,115 rows and 5 million columns >>4. The row id (primary key) is the original file name >>5. The columns are the row ids in the original column vectors >>6. Now all I need to do is pivot this single table to get what I >>want. This is the only join or map/reduce like operation >>7. A table with 5million rows and 10,114 columns >> >> >> >> >> >> My final table is about 220 gb. I know at google my I have quota for up 2 >> mega mem machines. Each one has some think like 1.4 Tb of memory >> >> >> >> Kind regards >> >> >> >> Andy >> >> >> >>
Re: How is union() implemented? Need to implement column bind
Oh, Spark directly supports upserts (with the right data destination) and yeah you could do this as 1+ updates to a table without any pivoting, etc. It'd still end up being 10K+ single joins along the way but individual steps are simpler. It might actually be pretty efficient I/O wise as columnar formats would not rewrite any other data on a write like this. On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson wrote: > Hi Sean > > > > My “insert” solution is hack that might work give we can easily spin up a > single VM with a crazy amouts of memory. I would prefer to see a > distributed solution. It is just a matter of time before someone want to > create an even bigger table using cbind. > > > > I understand you probably already know a lot about traditional RDBS’s. > Much of my post is back ground for others > > > > I used to do some of classic relational database work before tools like > Hadoop, spark and NoSQL became available . > > > > The standard operations on a single table in a relation database are > > > > Insert “row”. This is similar to spark union. Typically primary keys in > in rbdms tables are indexed to enable quick look up. So insert is > probably not 1 for. 1 with union. The row may not simply be appended to the > end of the table. > > > > Update a “row” > > Delete a “row” > > Select “rows where” > > > > Rdms server enable row and table level locking. Data must always be in a > consistent state. You must commit or abort you changes for them to persist > and to release locks on the data. Locks are required because you have a > single resource and may user requesting service simultaneously. This is > very different from Spark > > > > Storage and memory used to be really expensive so often people tried to > create “1st normal form” schemas. I.E. no duplicate data to reduce > hardware cost. 1st normal design require you to use joins to the get > data table you want. Joins are expensive. Often design duplicated some data > to improve performance by minimize the number of joins required. Duplicate > data make maintaining consistency harder. There are other advantages to > normalized data design and as we are all aware in the bigdata world lots of > disadvantages. The dbms ran on a single big machine. Join was not > implemented as distributed map/reduce. > > > > So My idea is use a traditional RDMS server: my final table will have 5 > million rows and 10,114 columns. > >1. Read the column vector from each of 10,114 data files >2. insert the column vector as a row in the table > 1. I read a file that has a single element on each line. All I need > to do is replace \n with , >3. Now I have table with 10,115 rows and 5 million columns >4. The row id (primary key) is the original file name >5. The columns are the row ids in the original column vectors >6. Now all I need to do is pivot this single table to get what I want. >This is the only join or map/reduce like operation >7. A table with 5million rows and 10,114 columns > > > > > > My final table is about 220 gb. I know at google my I have quota for up 2 > mega mem machines. Each one has some think like 1.4 Tb of memory > > > > Kind regards > > > > Andy > > > >
Re: How is union() implemented? Need to implement column bind
Hi Sean My “insert” solution is hack that might work give we can easily spin up a single VM with a crazy amouts of memory. I would prefer to see a distributed solution. It is just a matter of time before someone want to create an even bigger table using cbind. I understand you probably already know a lot about traditional RDBS’s. Much of my post is back ground for others I used to do some of classic relational database work before tools like Hadoop, spark and NoSQL became available . The standard operations on a single table in a relation database are Insert “row”. This is similar to spark union. Typically primary keys in in rbdms tables are indexed to enable quick look up. So insert is probably not 1 for. 1 with union. The row may not simply be appended to the end of the table. Update a “row” Delete a “row” Select “rows where” Rdms server enable row and table level locking. Data must always be in a consistent state. You must commit or abort you changes for them to persist and to release locks on the data. Locks are required because you have a single resource and may user requesting service simultaneously. This is very different from Spark Storage and memory used to be really expensive so often people tried to create “1st normal form” schemas. I.E. no duplicate data to reduce hardware cost. 1st normal design require you to use joins to the get data table you want. Joins are expensive. Often design duplicated some data to improve performance by minimize the number of joins required. Duplicate data make maintaining consistency harder. There are other advantages to normalized data design and as we are all aware in the bigdata world lots of disadvantages. The dbms ran on a single big machine. Join was not implemented as distributed map/reduce. So My idea is use a traditional RDMS server: my final table will have 5 million rows and 10,114 columns. 1. Read the column vector from each of 10,114 data files 2. insert the column vector as a row in the table * I read a file that has a single element on each line. All I need to do is replace \n with , 3. Now I have table with 10,115 rows and 5 million columns 4. The row id (primary key) is the original file name 5. The columns are the row ids in the original column vectors 6. Now all I need to do is pivot this single table to get what I want. This is the only join or map/reduce like operation 7. A table with 5million rows and 10,114 columns My final table is about 220 gb. I know at google my I have quota for up 2 mega mem machines. Each one has some think like 1.4 Tb of memory Kind regards Andy From: Sean Owen Date: Wednesday, April 20, 2022 at 5:34 PM To: Andrew Davidson Cc: Andrew Melo , Bjørn Jørgensen , "user @spark" Subject: Re: How is union() implemented? Need to implement column bind Wait, how is all that related to cbind -- very different from what's needed to insert. BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can you express this in SQL without joins? I'm just guessing joining 10K+ tables is hard anywhere. On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson mailto:aedav...@ucsc.edu>> wrote: I was thinking about something like bigQuery a little more. I do not know how it is implemented. However I believe traditional relational databases are row oriented and typically run on single machine. You can lock at the row level. This leads me to speculate that row level inserts maybe more efficient that the way spark implements union. One way to create my uber matrix would be to read the column vectors from the 10,114 individual files and insert them as rows in a table, then pivot the table. I am going to poke around a bit. For all I know bigQuery use map reduce like spark. Kind regards Andy From: Sean Owen mailto:sro...@gmail.com>> Date: Wednesday, April 20, 2022 at 2:31 PM To: Andrew Melo mailto:andrew.m...@gmail.com>> Cc: Andrew Davidson mailto:aedav...@ucsc.edu>>, Bjørn Jørgensen mailto:bjornjorgen...@gmail.com>>, "user @spark" mailto:user@spark.apache.org>> Subject: Re: How is union() implemented? Need to implement column bind I don't think there's fundamental disapproval (it is implemented in sparklyr) just a question of how you make this work at scale in general. It's not a super natural operation in this context but can be done. If you find a successful solution at extremes then maybe it generalizes. On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo mailto:andrew.m...@gmail.com>> wrote: It would certainly be useful for our domain to have some sort of native cbind(). Is there a fundamental disapproval of adding that functionality, or is it just a matter of nobody implementing it? On Wed, Apr 20, 2022 at 16:28 Sean Owen mailto:sro...@gmail.com>> wrote: Good lead, pandas on Spark concat() is worth trying. It looks like it uses a join, but not 100% sure from the source. Th
Re: How is union() implemented? Need to implement column bind
Wait, how is all that related to cbind -- very different from what's needed to insert. BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can you express this in SQL without joins? I'm just guessing joining 10K+ tables is hard anywhere. On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson wrote: > I was thinking about something like bigQuery a little more. I do not know > how it is implemented. However I believe traditional relational databases > are row oriented and typically run on single machine. You can lock at the > row level. This leads me to speculate that row level inserts maybe more > efficient that the way spark implements union. One way to create my uber > matrix would be to read the column vectors from the 10,114 individual > files and insert them as rows in a table, then pivot the table. I am going > to poke around a bit. For all I know bigQuery use map reduce like spark. > > > > Kind regards > > > > Andy > > > > *From: *Sean Owen > *Date: *Wednesday, April 20, 2022 at 2:31 PM > *To: *Andrew Melo > *Cc: *Andrew Davidson , Bjørn Jørgensen < > bjornjorgen...@gmail.com>, "user @spark" > *Subject: *Re: How is union() implemented? Need to implement column bind > > > > I don't think there's fundamental disapproval (it is implemented in > sparklyr) just a question of how you make this work at scale in general. > It's not a super natural operation in this context but can be done. If you > find a successful solution at extremes then maybe it generalizes. > > > > On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo wrote: > > It would certainly be useful for our domain to have some sort of native > cbind(). Is there a fundamental disapproval of adding that functionality, > or is it just a matter of nobody implementing it? > > > > On Wed, Apr 20, 2022 at 16:28 Sean Owen wrote: > > Good lead, pandas on Spark concat() is worth trying. It looks like it uses > a join, but not 100% sure from the source. > > The SQL concat() function is indeed a different thing. > > > > On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen > wrote: > > Sorry for asking. But why does`t concat work? > > > > Pandas on spark have ps.concat > <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> > which > takes 2 dataframes and concat them to 1 dataframe. > > It seems > <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> > like the pyspark version takes 2 columns and concat it to one column. > > > > ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen : > > cbind? yeah though the answer is typically a join. I don't know if there's > a better option in a SQL engine, as SQL doesn't have anything to offer > except join and pivot either (? right?) > > Certainly, the dominant data storage paradigm is wide tables, whereas > you're starting with effectively a huge number of tiny slim tables, which > is the impedance mismatch here. > > > > On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson wrote: > > Thanks Sean > > > > I imagine this is a fairly common problem in data science. Any idea how > other solve? For example I wonder if running join something like BigQuery > might work better? I do not know much about the implementation. > > > > No one tool will solve all problems. Once I get the matrix I think it > spark will work well for our need > > > > Kind regards > > > > Andy > > > > *From: *Sean Owen > *Date: *Monday, April 18, 2022 at 6:58 PM > *To: *Andrew Davidson > *Cc: *"user @spark" > *Subject: *Re: How is union() implemented? Need to implement column bind > > > > A join is the natural answer, but this is a 10114-way join, which probably > chokes readily just to even plan it, let alone all the shuffling and > shuffling of huge data. You could tune your way out of it maybe, but not > optimistic. It's just huge. > > > > You could go off-road and lower-level to take advantage of the structure > of the data. You effectively want "column bind". There is no such operation > in Spark. (union is 'row bind'.) You could do this with zipPartition, which > is in the RDD API, and to my surprise, not in the Python API but exists in > Scala. And R (!). If you can read several RDDs of data, you can use this > method to pair all their corresponding values and ultimately get rows of > 10114 values out. In fact that is how sparklyr implements cbind on Spark, > FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html > > > > The issue I see is that you ca
Re: How is union() implemented? Need to implement column bind
I was thinking about something like bigQuery a little more. I do not know how it is implemented. However I believe traditional relational databases are row oriented and typically run on single machine. You can lock at the row level. This leads me to speculate that row level inserts maybe more efficient that the way spark implements union. One way to create my uber matrix would be to read the column vectors from the 10,114 individual files and insert them as rows in a table, then pivot the table. I am going to poke around a bit. For all I know bigQuery use map reduce like spark. Kind regards Andy From: Sean Owen Date: Wednesday, April 20, 2022 at 2:31 PM To: Andrew Melo Cc: Andrew Davidson , Bjørn Jørgensen , "user @spark" Subject: Re: How is union() implemented? Need to implement column bind I don't think there's fundamental disapproval (it is implemented in sparklyr) just a question of how you make this work at scale in general. It's not a super natural operation in this context but can be done. If you find a successful solution at extremes then maybe it generalizes. On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo mailto:andrew.m...@gmail.com>> wrote: It would certainly be useful for our domain to have some sort of native cbind(). Is there a fundamental disapproval of adding that functionality, or is it just a matter of nobody implementing it? On Wed, Apr 20, 2022 at 16:28 Sean Owen mailto:sro...@gmail.com>> wrote: Good lead, pandas on Spark concat() is worth trying. It looks like it uses a join, but not 100% sure from the source. The SQL concat() function is indeed a different thing. On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen mailto:bjornjorgen...@gmail.com>> wrote: Sorry for asking. But why does`t concat work? Pandas on spark have ps.concat<https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> which takes 2 dataframes and concat them to 1 dataframe. It seems<https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> like the pyspark version takes 2 columns and concat it to one column. ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen mailto:sro...@gmail.com>>: cbind? yeah though the answer is typically a join. I don't know if there's a better option in a SQL engine, as SQL doesn't have anything to offer except join and pivot either (? right?) Certainly, the dominant data storage paradigm is wide tables, whereas you're starting with effectively a huge number of tiny slim tables, which is the impedance mismatch here. On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson mailto:aedav...@ucsc.edu>> wrote: Thanks Sean I imagine this is a fairly common problem in data science. Any idea how other solve? For example I wonder if running join something like BigQuery might work better? I do not know much about the implementation. No one tool will solve all problems. Once I get the matrix I think it spark will work well for our need Kind regards Andy From: Sean Owen mailto:sro...@gmail.com>> Date: Monday, April 18, 2022 at 6:58 PM To: Andrew Davidson mailto:aedav...@ucsc.edu>> Cc: "user @spark" mailto:user@spark.apache.org>> Subject: Re: How is union() implemented? Need to implement column bind A join is the natural answer, but this is a 10114-way join, which probably chokes readily just to even plan it, let alone all the shuffling and shuffling of huge data. You could tune your way out of it maybe, but not optimistic. It's just huge. You could go off-road and lower-level to take advantage of the structure of the data. You effectively want "column bind". There is no such operation in Spark. (union is 'row bind'.) You could do this with zipPartition, which is in the RDD API, and to my surprise, not in the Python API but exists in Scala. And R (!). If you can read several RDDs of data, you can use this method to pair all their corresponding values and ultimately get rows of 10114 values out. In fact that is how sparklyr implements cbind on Spark, FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html The issue I see is that you can only zip a few at a time; you don't want to zip 10114 of them. Perhaps you have to do that iteratively, and I don't know if that is going to face the same issues with huge huge plans. I like the pivot idea. If you can read the individual files as data rows (maybe list all the file names, parallelize with Spark, write a UDF that reads the data for that file to generate the rows). If you can emit (file, index, value) and groupBy index, pivot on file (I think?) that should be about it? I think it doesn't need additional hashing or whatever. Not sure how fast it is but that seems more direct than the join, as well. On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson wrote: Hi have a hard problem
Re: How is union() implemented? Need to implement column bind
I don't think there's fundamental disapproval (it is implemented in sparklyr) just a question of how you make this work at scale in general. It's not a super natural operation in this context but can be done. If you find a successful solution at extremes then maybe it generalizes. On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo wrote: > It would certainly be useful for our domain to have some sort of native > cbind(). Is there a fundamental disapproval of adding that functionality, > or is it just a matter of nobody implementing it? > > On Wed, Apr 20, 2022 at 16:28 Sean Owen wrote: > >> Good lead, pandas on Spark concat() is worth trying. It looks like it >> uses a join, but not 100% sure from the source. >> The SQL concat() function is indeed a different thing. >> >> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen >> wrote: >> >>> Sorry for asking. But why does`t concat work? >>> >>> Pandas on spark have ps.concat >>> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> >>> which >>> takes 2 dataframes and concat them to 1 dataframe. >>> It seems >>> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> >>> like the pyspark version takes 2 columns and concat it to one column. >>> >>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen : >>> >>>> cbind? yeah though the answer is typically a join. I don't know if >>>> there's a better option in a SQL engine, as SQL doesn't have anything to >>>> offer except join and pivot either (? right?) >>>> Certainly, the dominant data storage paradigm is wide tables, whereas >>>> you're starting with effectively a huge number of tiny slim tables, which >>>> is the impedance mismatch here. >>>> >>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson >>>> wrote: >>>> >>>>> Thanks Sean >>>>> >>>>> >>>>> >>>>> I imagine this is a fairly common problem in data science. Any idea >>>>> how other solve? For example I wonder if running join something like >>>>> BigQuery might work better? I do not know much about the implementation. >>>>> >>>>> >>>>> >>>>> No one tool will solve all problems. Once I get the matrix I think it >>>>> spark will work well for our need >>>>> >>>>> >>>>> >>>>> Kind regards >>>>> >>>>> >>>>> >>>>> Andy >>>>> >>>>> >>>>> >>>>> *From: *Sean Owen >>>>> *Date: *Monday, April 18, 2022 at 6:58 PM >>>>> *To: *Andrew Davidson >>>>> *Cc: *"user @spark" >>>>> *Subject: *Re: How is union() implemented? Need to implement column >>>>> bind >>>>> >>>>> >>>>> >>>>> A join is the natural answer, but this is a 10114-way join, which >>>>> probably chokes readily just to even plan it, let alone all the shuffling >>>>> and shuffling of huge data. You could tune your way out of it maybe, but >>>>> not optimistic. It's just huge. >>>>> >>>>> >>>>> >>>>> You could go off-road and lower-level to take advantage of the >>>>> structure of the data. You effectively want "column bind". There is no >>>>> such >>>>> operation in Spark. (union is 'row bind'.) You could do this with >>>>> zipPartition, which is in the RDD API, and to my surprise, not in the >>>>> Python API but exists in Scala. And R (!). If you can read several RDDs of >>>>> data, you can use this method to pair all their corresponding values and >>>>> ultimately get rows of 10114 values out. In fact that is how sparklyr >>>>> implements cbind on Spark, FWIW: >>>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html >>>>> >>>>> >>>>> >>>>> The issue I see is that you can only zip a few at a time; you don't >>>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I >>>>> don't know if that is going to face the same issues with huge huge plans. >>>>> >>>>> >>>>> >>>>> I
Re: How is union() implemented? Need to implement column bind
It would certainly be useful for our domain to have some sort of native cbind(). Is there a fundamental disapproval of adding that functionality, or is it just a matter of nobody implementing it? On Wed, Apr 20, 2022 at 16:28 Sean Owen wrote: > Good lead, pandas on Spark concat() is worth trying. It looks like it uses > a join, but not 100% sure from the source. > The SQL concat() function is indeed a different thing. > > On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen > wrote: > >> Sorry for asking. But why does`t concat work? >> >> Pandas on spark have ps.concat >> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> >> which >> takes 2 dataframes and concat them to 1 dataframe. >> It seems >> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> >> like the pyspark version takes 2 columns and concat it to one column. >> >> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen : >> >>> cbind? yeah though the answer is typically a join. I don't know if >>> there's a better option in a SQL engine, as SQL doesn't have anything to >>> offer except join and pivot either (? right?) >>> Certainly, the dominant data storage paradigm is wide tables, whereas >>> you're starting with effectively a huge number of tiny slim tables, which >>> is the impedance mismatch here. >>> >>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson >>> wrote: >>> >>>> Thanks Sean >>>> >>>> >>>> >>>> I imagine this is a fairly common problem in data science. Any idea how >>>> other solve? For example I wonder if running join something like BigQuery >>>> might work better? I do not know much about the implementation. >>>> >>>> >>>> >>>> No one tool will solve all problems. Once I get the matrix I think it >>>> spark will work well for our need >>>> >>>> >>>> >>>> Kind regards >>>> >>>> >>>> >>>> Andy >>>> >>>> >>>> >>>> *From: *Sean Owen >>>> *Date: *Monday, April 18, 2022 at 6:58 PM >>>> *To: *Andrew Davidson >>>> *Cc: *"user @spark" >>>> *Subject: *Re: How is union() implemented? Need to implement column >>>> bind >>>> >>>> >>>> >>>> A join is the natural answer, but this is a 10114-way join, which >>>> probably chokes readily just to even plan it, let alone all the shuffling >>>> and shuffling of huge data. You could tune your way out of it maybe, but >>>> not optimistic. It's just huge. >>>> >>>> >>>> >>>> You could go off-road and lower-level to take advantage of the >>>> structure of the data. You effectively want "column bind". There is no such >>>> operation in Spark. (union is 'row bind'.) You could do this with >>>> zipPartition, which is in the RDD API, and to my surprise, not in the >>>> Python API but exists in Scala. And R (!). If you can read several RDDs of >>>> data, you can use this method to pair all their corresponding values and >>>> ultimately get rows of 10114 values out. In fact that is how sparklyr >>>> implements cbind on Spark, FWIW: >>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html >>>> >>>> >>>> >>>> The issue I see is that you can only zip a few at a time; you don't >>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I >>>> don't know if that is going to face the same issues with huge huge plans. >>>> >>>> >>>> >>>> I like the pivot idea. If you can read the individual files as data >>>> rows (maybe list all the file names, parallelize with Spark, write a UDF >>>> that reads the data for that file to generate the rows). If you can emit >>>> (file, index, value) and groupBy index, pivot on file (I think?) that >>>> should be about it? I think it doesn't need additional hashing or whatever. >>>> Not sure how fast it is but that seems more direct than the join, as well. >>>> >>>> >>>> >>>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson >>>> wrote: >>>> >>>> Hi have a hard problem >>>
Re: How is union() implemented? Need to implement column bind
Good lead, pandas on Spark concat() is worth trying. It looks like it uses a join, but not 100% sure from the source. The SQL concat() function is indeed a different thing. On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen wrote: > Sorry for asking. But why does`t concat work? > > Pandas on spark have ps.concat > <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> > which > takes 2 dataframes and concat them to 1 dataframe. > It seems > <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> > like the pyspark version takes 2 columns and concat it to one column. > > ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen : > >> cbind? yeah though the answer is typically a join. I don't know if >> there's a better option in a SQL engine, as SQL doesn't have anything to >> offer except join and pivot either (? right?) >> Certainly, the dominant data storage paradigm is wide tables, whereas >> you're starting with effectively a huge number of tiny slim tables, which >> is the impedance mismatch here. >> >> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson >> wrote: >> >>> Thanks Sean >>> >>> >>> >>> I imagine this is a fairly common problem in data science. Any idea how >>> other solve? For example I wonder if running join something like BigQuery >>> might work better? I do not know much about the implementation. >>> >>> >>> >>> No one tool will solve all problems. Once I get the matrix I think it >>> spark will work well for our need >>> >>> >>> >>> Kind regards >>> >>> >>> >>> Andy >>> >>> >>> >>> *From: *Sean Owen >>> *Date: *Monday, April 18, 2022 at 6:58 PM >>> *To: *Andrew Davidson >>> *Cc: *"user @spark" >>> *Subject: *Re: How is union() implemented? Need to implement column bind >>> >>> >>> >>> A join is the natural answer, but this is a 10114-way join, which >>> probably chokes readily just to even plan it, let alone all the shuffling >>> and shuffling of huge data. You could tune your way out of it maybe, but >>> not optimistic. It's just huge. >>> >>> >>> >>> You could go off-road and lower-level to take advantage of the structure >>> of the data. You effectively want "column bind". There is no such operation >>> in Spark. (union is 'row bind'.) You could do this with zipPartition, which >>> is in the RDD API, and to my surprise, not in the Python API but exists in >>> Scala. And R (!). If you can read several RDDs of data, you can use this >>> method to pair all their corresponding values and ultimately get rows of >>> 10114 values out. In fact that is how sparklyr implements cbind on Spark, >>> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html >>> >>> >>> >>> The issue I see is that you can only zip a few at a time; you don't want >>> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't >>> know if that is going to face the same issues with huge huge plans. >>> >>> >>> >>> I like the pivot idea. If you can read the individual files as data rows >>> (maybe list all the file names, parallelize with Spark, write a UDF that >>> reads the data for that file to generate the rows). If you can emit (file, >>> index, value) and groupBy index, pivot on file (I think?) that should be >>> about it? I think it doesn't need additional hashing or whatever. Not sure >>> how fast it is but that seems more direct than the join, as well. >>> >>> >>> >>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson >>> wrote: >>> >>> Hi have a hard problem >>> >>> >>> >>> I have 10114 column vectors each in a separate file. The file has 2 >>> columns, the row id, and numeric values. The row ids are identical and in >>> sort order. All the column vectors have the same number of rows. There are >>> over 5 million rows. I need to combine them into a single table. The row >>> ids are very long strings. The column names are about 20 chars long. >>> >>> >>> >>> My current implementation uses join. This takes a long time on a >>> cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often >>> crashes. I
Re: How is union() implemented? Need to implement column bind
Sorry for asking. But why does`t concat work? Pandas on spark have ps.concat <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299> which takes 2 dataframes and concat them to 1 dataframe. It seems <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat> like the pyspark version takes 2 columns and concat it to one column. ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen : > cbind? yeah though the answer is typically a join. I don't know if there's > a better option in a SQL engine, as SQL doesn't have anything to offer > except join and pivot either (? right?) > Certainly, the dominant data storage paradigm is wide tables, whereas > you're starting with effectively a huge number of tiny slim tables, which > is the impedance mismatch here. > > On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson wrote: > >> Thanks Sean >> >> >> >> I imagine this is a fairly common problem in data science. Any idea how >> other solve? For example I wonder if running join something like BigQuery >> might work better? I do not know much about the implementation. >> >> >> >> No one tool will solve all problems. Once I get the matrix I think it >> spark will work well for our need >> >> >> >> Kind regards >> >> >> >> Andy >> >> >> >> *From: *Sean Owen >> *Date: *Monday, April 18, 2022 at 6:58 PM >> *To: *Andrew Davidson >> *Cc: *"user @spark" >> *Subject: *Re: How is union() implemented? Need to implement column bind >> >> >> >> A join is the natural answer, but this is a 10114-way join, which >> probably chokes readily just to even plan it, let alone all the shuffling >> and shuffling of huge data. You could tune your way out of it maybe, but >> not optimistic. It's just huge. >> >> >> >> You could go off-road and lower-level to take advantage of the structure >> of the data. You effectively want "column bind". There is no such operation >> in Spark. (union is 'row bind'.) You could do this with zipPartition, which >> is in the RDD API, and to my surprise, not in the Python API but exists in >> Scala. And R (!). If you can read several RDDs of data, you can use this >> method to pair all their corresponding values and ultimately get rows of >> 10114 values out. In fact that is how sparklyr implements cbind on Spark, >> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html >> >> >> >> The issue I see is that you can only zip a few at a time; you don't want >> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't >> know if that is going to face the same issues with huge huge plans. >> >> >> >> I like the pivot idea. If you can read the individual files as data rows >> (maybe list all the file names, parallelize with Spark, write a UDF that >> reads the data for that file to generate the rows). If you can emit (file, >> index, value) and groupBy index, pivot on file (I think?) that should be >> about it? I think it doesn't need additional hashing or whatever. Not sure >> how fast it is but that seems more direct than the join, as well. >> >> >> >> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson >> wrote: >> >> Hi have a hard problem >> >> >> >> I have 10114 column vectors each in a separate file. The file has 2 >> columns, the row id, and numeric values. The row ids are identical and in >> sort order. All the column vectors have the same number of rows. There are >> over 5 million rows. I need to combine them into a single table. The row >> ids are very long strings. The column names are about 20 chars long. >> >> >> >> My current implementation uses join. This takes a long time on a cluster >> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I >> mean totally dead start over. Checkpoints do not seem help, It still >> crashes and need to be restarted from scratch. What is really surprising >> is the final file size is only 213G ! The way got the file was to copy >> all the column vectors to a single BIG IRON machine and used unix cut and >> paste. Took about 44 min to run once I got all the data moved around. It >> was very tedious and error prone. I had to move a lot data around. Not a >> particularly reproducible process. I will need to rerun this three more >> times on different data sets of about the same size >> >> >> >> I notic
Re: How is union() implemented? Need to implement column bind
cbind? yeah though the answer is typically a join. I don't know if there's a better option in a SQL engine, as SQL doesn't have anything to offer except join and pivot either (? right?) Certainly, the dominant data storage paradigm is wide tables, whereas you're starting with effectively a huge number of tiny slim tables, which is the impedance mismatch here. On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson wrote: > Thanks Sean > > > > I imagine this is a fairly common problem in data science. Any idea how > other solve? For example I wonder if running join something like BigQuery > might work better? I do not know much about the implementation. > > > > No one tool will solve all problems. Once I get the matrix I think it > spark will work well for our need > > > > Kind regards > > > > Andy > > > > *From: *Sean Owen > *Date: *Monday, April 18, 2022 at 6:58 PM > *To: *Andrew Davidson > *Cc: *"user @spark" > *Subject: *Re: How is union() implemented? Need to implement column bind > > > > A join is the natural answer, but this is a 10114-way join, which probably > chokes readily just to even plan it, let alone all the shuffling and > shuffling of huge data. You could tune your way out of it maybe, but not > optimistic. It's just huge. > > > > You could go off-road and lower-level to take advantage of the structure > of the data. You effectively want "column bind". There is no such operation > in Spark. (union is 'row bind'.) You could do this with zipPartition, which > is in the RDD API, and to my surprise, not in the Python API but exists in > Scala. And R (!). If you can read several RDDs of data, you can use this > method to pair all their corresponding values and ultimately get rows of > 10114 values out. In fact that is how sparklyr implements cbind on Spark, > FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html > > > > The issue I see is that you can only zip a few at a time; you don't want > to zip 10114 of them. Perhaps you have to do that iteratively, and I don't > know if that is going to face the same issues with huge huge plans. > > > > I like the pivot idea. If you can read the individual files as data rows > (maybe list all the file names, parallelize with Spark, write a UDF that > reads the data for that file to generate the rows). If you can emit (file, > index, value) and groupBy index, pivot on file (I think?) that should be > about it? I think it doesn't need additional hashing or whatever. Not sure > how fast it is but that seems more direct than the join, as well. > > > > On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson > wrote: > > Hi have a hard problem > > > > I have 10114 column vectors each in a separate file. The file has 2 > columns, the row id, and numeric values. The row ids are identical and in > sort order. All the column vectors have the same number of rows. There are > over 5 million rows. I need to combine them into a single table. The row > ids are very long strings. The column names are about 20 chars long. > > > > My current implementation uses join. This takes a long time on a cluster > with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I > mean totally dead start over. Checkpoints do not seem help, It still > crashes and need to be restarted from scratch. What is really surprising > is the final file size is only 213G ! The way got the file was to copy > all the column vectors to a single BIG IRON machine and used unix cut and > paste. Took about 44 min to run once I got all the data moved around. It > was very tedious and error prone. I had to move a lot data around. Not a > particularly reproducible process. I will need to rerun this three more > times on different data sets of about the same size > > > > I noticed that spark has a union function(). It implements row bind. Any > idea how it is implemented? Is it just map reduce under the covers? > > > > My thought was > > 1. load each col vector > > 2. maybe I need to replace the really long row id strings with > integers > > 3. convert column vectors into row vectors using piviot (Ie matrix > transpose.) > > 4. union all the row vectors into a single table > > 5. piviot the table back so I have the correct column vectors > > > > I could replace the row ids and column name with integers if needed, and > restore them later > > > > Maybe I would be better off using many small machines? I assume memory is > the limiting resource not cpu. I notice that memory usage will reach 100%. > I added several TB’s of local ssd. I am not convinced that spark is using >
Re: How is union() implemented? Need to implement column bind
Thanks Sean I imagine this is a fairly common problem in data science. Any idea how other solve? For example I wonder if running join something like BigQuery might work better? I do not know much about the implementation. No one tool will solve all problems. Once I get the matrix I think it spark will work well for our need Kind regards Andy From: Sean Owen Date: Monday, April 18, 2022 at 6:58 PM To: Andrew Davidson Cc: "user @spark" Subject: Re: How is union() implemented? Need to implement column bind A join is the natural answer, but this is a 10114-way join, which probably chokes readily just to even plan it, let alone all the shuffling and shuffling of huge data. You could tune your way out of it maybe, but not optimistic. It's just huge. You could go off-road and lower-level to take advantage of the structure of the data. You effectively want "column bind". There is no such operation in Spark. (union is 'row bind'.) You could do this with zipPartition, which is in the RDD API, and to my surprise, not in the Python API but exists in Scala. And R (!). If you can read several RDDs of data, you can use this method to pair all their corresponding values and ultimately get rows of 10114 values out. In fact that is how sparklyr implements cbind on Spark, FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html The issue I see is that you can only zip a few at a time; you don't want to zip 10114 of them. Perhaps you have to do that iteratively, and I don't know if that is going to face the same issues with huge huge plans. I like the pivot idea. If you can read the individual files as data rows (maybe list all the file names, parallelize with Spark, write a UDF that reads the data for that file to generate the rows). If you can emit (file, index, value) and groupBy index, pivot on file (I think?) that should be about it? I think it doesn't need additional hashing or whatever. Not sure how fast it is but that seems more direct than the join, as well. On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson wrote: Hi have a hard problem I have 10114 column vectors each in a separate file. The file has 2 columns, the row id, and numeric values. The row ids are identical and in sort order. All the column vectors have the same number of rows. There are over 5 million rows. I need to combine them into a single table. The row ids are very long strings. The column names are about 20 chars long. My current implementation uses join. This takes a long time on a cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally dead start over. Checkpoints do not seem help, It still crashes and need to be restarted from scratch. What is really surprising is the final file size is only 213G ! The way got the file was to copy all the column vectors to a single BIG IRON machine and used unix cut and paste. Took about 44 min to run once I got all the data moved around. It was very tedious and error prone. I had to move a lot data around. Not a particularly reproducible process. I will need to rerun this three more times on different data sets of about the same size I noticed that spark has a union function(). It implements row bind. Any idea how it is implemented? Is it just map reduce under the covers? My thought was 1. load each col vector 2. maybe I need to replace the really long row id strings with integers 3. convert column vectors into row vectors using piviot (Ie matrix transpose.) 4. union all the row vectors into a single table 5. piviot the table back so I have the correct column vectors I could replace the row ids and column name with integers if needed, and restore them later Maybe I would be better off using many small machines? I assume memory is the limiting resource not cpu. I notice that memory usage will reach 100%. I added several TB’s of local ssd. I am not convinced that spark is using the local disk will this perform better than join? · The rows before the final pivot will be very very wide (over 5 million columns) · There will only be 10114 rows before the pivot I assume the pivots will shuffle all the data. I assume the Colum vectors are trivial. The file table pivot will be expensive however will only need to be done once Comments and suggestions appreciated Andy
Re: How is union() implemented? Need to implement column bind
A join is the natural answer, but this is a 10114-way join, which probably chokes readily just to even plan it, let alone all the shuffling and shuffling of huge data. You could tune your way out of it maybe, but not optimistic. It's just huge. You could go off-road and lower-level to take advantage of the structure of the data. You effectively want "column bind". There is no such operation in Spark. (union is 'row bind'.) You could do this with zipPartition, which is in the RDD API, and to my surprise, not in the Python API but exists in Scala. And R (!). If you can read several RDDs of data, you can use this method to pair all their corresponding values and ultimately get rows of 10114 values out. In fact that is how sparklyr implements cbind on Spark, FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html The issue I see is that you can only zip a few at a time; you don't want to zip 10114 of them. Perhaps you have to do that iteratively, and I don't know if that is going to face the same issues with huge huge plans. I like the pivot idea. If you can read the individual files as data rows (maybe list all the file names, parallelize with Spark, write a UDF that reads the data for that file to generate the rows). If you can emit (file, index, value) and groupBy index, pivot on file (I think?) that should be about it? I think it doesn't need additional hashing or whatever. Not sure how fast it is but that seems more direct than the join, as well. On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson wrote: > Hi have a hard problem > > > > I have 10114 column vectors each in a separate file. The file has 2 > columns, the row id, and numeric values. The row ids are identical and in > sort order. All the column vectors have the same number of rows. There are > over 5 million rows. I need to combine them into a single table. The row > ids are very long strings. The column names are about 20 chars long. > > > > My current implementation uses join. This takes a long time on a cluster > with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I > mean totally dead start over. Checkpoints do not seem help, It still > crashes and need to be restarted from scratch. What is really surprising > is the final file size is only 213G ! The way got the file was to copy > all the column vectors to a single BIG IRON machine and used unix cut and > paste. Took about 44 min to run once I got all the data moved around. It > was very tedious and error prone. I had to move a lot data around. Not a > particularly reproducible process. I will need to rerun this three more > times on different data sets of about the same size > > > > I noticed that spark has a union function(). It implements row bind. Any > idea how it is implemented? Is it just map reduce under the covers? > > > > My thought was > >1. load each col vector >2. maybe I need to replace the really long row id strings with integers >3. convert column vectors into row vectors using piviot (Ie matrix >transpose.) >4. union all the row vectors into a single table >5. piviot the table back so I have the correct column vectors > > > > I could replace the row ids and column name with integers if needed, and > restore them later > > > > Maybe I would be better off using many small machines? I assume memory is > the limiting resource not cpu. I notice that memory usage will reach 100%. > I added several TB’s of local ssd. I am not convinced that spark is using > the local disk > > > > > > will this perform better than join? > > > >- The rows before the final pivot will be very very wide (over 5 >million columns) >- There will only be 10114 rows before the pivot > > > > I assume the pivots will shuffle all the data. I assume the Colum vectors > are trivial. The file table pivot will be expensive however will only need > to be done once > > > > > > > > Comments and suggestions appreciated > > > > Andy > > > > >
How is union() implemented? Need to implement column bind
Hi have a hard problem I have 10114 column vectors each in a separate file. The file has 2 columns, the row id, and numeric values. The row ids are identical and in sort order. All the column vectors have the same number of rows. There are over 5 million rows. I need to combine them into a single table. The row ids are very long strings. The column names are about 20 chars long. My current implementation uses join. This takes a long time on a cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally dead start over. Checkpoints do not seem help, It still crashes and need to be restarted from scratch. What is really surprising is the final file size is only 213G ! The way got the file was to copy all the column vectors to a single BIG IRON machine and used unix cut and paste. Took about 44 min to run once I got all the data moved around. It was very tedious and error prone. I had to move a lot data around. Not a particularly reproducible process. I will need to rerun this three more times on different data sets of about the same size I noticed that spark has a union function(). It implements row bind. Any idea how it is implemented? Is it just map reduce under the covers? My thought was 1. load each col vector 2. maybe I need to replace the really long row id strings with integers 3. convert column vectors into row vectors using piviot (Ie matrix transpose.) 4. union all the row vectors into a single table 5. piviot the table back so I have the correct column vectors I could replace the row ids and column name with integers if needed, and restore them later Maybe I would be better off using many small machines? I assume memory is the limiting resource not cpu. I notice that memory usage will reach 100%. I added several TB’s of local ssd. I am not convinced that spark is using the local disk will this perform better than join? * The rows before the final pivot will be very very wide (over 5 million columns) * There will only be 10114 rows before the pivot I assume the pivots will shuffle all the data. I assume the Colum vectors are trivial. The file table pivot will be expensive however will only need to be done once Comments and suggestions appreciated Andy