Hi Sean

My “insert” solution is hack that might work give we can easily spin up a 
single VM with a crazy amouts of memory. I would prefer to see a distributed 
solution. It is just a matter of time before someone want to create an even 
bigger table using cbind.

I understand you probably already know a lot about traditional RDBS’s. Much of 
my post is back ground for others

I used to do some of classic relational database work before tools like Hadoop, 
spark and NoSQL became available .

The standard operations on a single table in a relation database are

Insert “row”. This is similar to spark union.  Typically primary keys in  in 
rbdms tables are indexed  to enable quick look up. So insert is probably not 1 
for. 1 with union. The row may not simply be appended to the end of the table.

Update a “row”
Delete a “row”
Select “rows where”

Rdms server enable row and table level locking. Data must always be in a 
consistent state. You must commit or abort you changes for them to persist and 
to release locks on the data. Locks are required because you have a single 
resource and may user requesting service simultaneously. This is very different 
from Spark

Storage and memory used to be really expensive so often people tried to create 
“1st normal form” schemas. I.E. no duplicate data to reduce hardware cost.  1st 
normal design require you to use joins to the get data table you want. Joins 
are expensive. Often design duplicated some data to improve performance by 
minimize the number of joins required. Duplicate data make maintaining 
consistency harder. There are other advantages to normalized data design and as 
we are all aware in the bigdata world lots of disadvantages. The dbms ran on a 
single big machine. Join was not implemented as distributed map/reduce.

So My idea is use a traditional RDMS server: my final table will have 5 million 
rows and 10,114 columns.

  1.  Read the column vector from each of 10,114 data files
  2.  insert the column vector as a row in the table
     *   I read a file that has a single element on each line. All I need to do 
is replace \n with ,
  3.  Now I have table with 10,115 rows and 5 million columns
  4.  The row id (primary key) is the original file name
  5.  The columns are the row ids in the original column vectors
  6.  Now all I need to do is pivot this single table to get what I want. This 
is the only join or map/reduce like operation
  7.  A table with 5million rows and 10,114 columns

My final table is about 220 gb. I know at google my I have quota for up 2 mega 
mem machines. Each one has some think like 1.4 Tb of memory

Kind regards


From: Sean Owen <sro...@gmail.com>
Date: Wednesday, April 20, 2022 at 5:34 PM
To: Andrew Davidson <aedav...@ucsc.edu>
Cc: Andrew Melo <andrew.m...@gmail.com>, Bjørn Jørgensen 
<bjornjorgen...@gmail.com>, "user @spark" <user@spark.apache.org>
Subject: Re: How is union() implemented? Need to implement column bind

Wait, how is all that related to cbind -- very different from what's needed to 
BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can you 
express this in SQL without joins? I'm just guessing joining 10K+ tables is 
hard anywhere.

On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson 
<aedav...@ucsc.edu<mailto:aedav...@ucsc.edu>> wrote:
I was thinking about something like bigQuery a little more. I do not know how 
it is implemented. However I believe traditional relational databases are row 
oriented and typically run on single machine. You can lock at the row level. 
This leads me to speculate that row level inserts maybe more efficient that the 
way spark implements union. One way to create my uber matrix would be to read 
the column vectors from the  10,114 individual files and insert them as rows in 
a table, then pivot the table.  I am going to poke around a bit. For all I know 
bigQuery use map reduce like spark.

Kind regards


From: Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>>
Date: Wednesday, April 20, 2022 at 2:31 PM
To: Andrew Melo <andrew.m...@gmail.com<mailto:andrew.m...@gmail.com>>
Cc: Andrew Davidson <aedav...@ucsc.edu<mailto:aedav...@ucsc.edu>>, Bjørn 
Jørgensen <bjornjorgen...@gmail.com<mailto:bjornjorgen...@gmail.com>>, "user 
@spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How is union() implemented? Need to implement column bind

I don't think there's fundamental disapproval (it is implemented in sparklyr) 
just a question of how you make this work at scale in general. It's not a super 
natural operation in this context but can be done. If you find a successful 
solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo 
<andrew.m...@gmail.com<mailto:andrew.m...@gmail.com>> wrote:
It would certainly be useful for our domain to have some sort of native 
cbind(). Is there a fundamental disapproval of adding that functionality, or is 
it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen 
<sro...@gmail.com<mailto:sro...@gmail.com>> wrote:
Good lead, pandas on Spark concat() is worth trying. It looks like it uses a 
join, but not 100% sure from the source.
The SQL concat() function is indeed a different thing.

On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
<bjornjorgen...@gmail.com<mailto:bjornjorgen...@gmail.com>> wrote:
Sorry for asking. But why does`t concat work?

Pandas on spark have 
 which takes 2 dataframes and concat them to 1 dataframe.
 like the pyspark version takes 2 columns and concat it to one column.

ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen 
cbind? yeah though the answer is typically a join. I don't know if there's a 
better option in a SQL engine, as SQL doesn't have anything to offer except 
join and pivot either (? right?)
Certainly, the dominant data storage paradigm is wide tables, whereas you're 
starting with effectively a huge number of tiny slim tables, which is the 
impedance mismatch here.

On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
<aedav...@ucsc.edu<mailto:aedav...@ucsc.edu>> wrote:
Thanks Sean

I imagine this is a fairly common problem in data science. Any idea how other 
solve?  For example I wonder if running join something like BigQuery might work 
better? I do not know much about the implementation.

No one tool will  solve all problems. Once I get the matrix I think it spark 
will work well for our need

Kind regards


From: Sean Owen <sro...@gmail.com<mailto:sro...@gmail.com>>
Date: Monday, April 18, 2022 at 6:58 PM
To: Andrew Davidson <aedav...@ucsc.edu<mailto:aedav...@ucsc.edu>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How is union() implemented? Need to implement column bind

A join is the natural answer, but this is a 10114-way join, which probably 
chokes readily just to even plan it, let alone all the shuffling and shuffling 
of huge data. You could tune your way out of it maybe, but not optimistic. It's 
just huge.

You could go off-road and lower-level to take advantage of the structure of the 
data. You effectively want "column bind". There is no such operation in Spark. 
(union is 'row bind'.) You could do this with zipPartition, which is in the RDD 
API, and to my surprise, not in the Python API but exists in Scala. And R (!). 
If you can read several RDDs of data, you can use this method to pair all their 
corresponding values and ultimately get rows of 10114 values out. In fact that 
is how sparklyr implements cbind on Spark, FWIW: 

The issue I see is that you can only zip a few at a time; you don't want to zip 
10114 of them. Perhaps you have to do that iteratively, and I don't know if 
that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows (maybe 
list all the file names, parallelize with Spark, write a UDF that reads the 
data for that file to generate the rows). If you can emit (file, index, value) 
and groupBy index, pivot on file (I think?) that should be about it? I think it 
doesn't need additional hashing or whatever. Not sure how fast it is but that 
seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson <aedav...@ucsc.edu.invalid> 
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

1.      load each col vector

2.      maybe I need to replace the really long row id strings with integers

3.      convert column vectors into row vectors using piviot (Ie matrix 

4.      union all the row vectors into a single table

5.      piviot the table back so I have the correct column vectors

I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk

will this perform better than join?

• The rows  before the final pivot will be very very wide (over 5 million 

• There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once

Comments and suggestions appreciated


Bjørn Jørgensen
Vestre Aspehaug 
6010 Ålesund

+47 480 94 297
It's dark in this basement.

Reply via email to