Re: How is union() implemented? Need to implement column bind

2022-04-21 Thread Sean Owen
Not a max - all values are needed. pivot() if anything is much closer, but
see the rest of this thread.

On Thu, Apr 21, 2022 at 1:19 AM Sonal Goyal  wrote:

> Seems like an interesting problem to solve!
>
> If I have understood it correctly, you have 10114 files each with the
> structure
>
> rowid, colA
> r1, a
> r2, b
> r3, c
> ...5 million rows
>
> if you union them, you will have
> rowid, colA, colB
> r1, a, null
> r2, b, null
> r3, c, null
> r1, null, d
> r2, null, e
> r3, null, f
>
> Will a window partition by rowid and max on column values not work ?
>
> Cheers,
> Sonal
> https://github.com/zinggAI/zingg
>
>
>
> On Thu, Apr 21, 2022 at 6:50 AM Sean Owen  wrote:
>
>> Oh, Spark directly supports upserts (with the right data destination) and
>> yeah you could do this as 1+ updates to a table without any pivoting,
>> etc. It'd still end up being 10K+ single joins along the way but individual
>> steps are simpler. It might actually be pretty efficient I/O wise as
>> columnar formats would not rewrite any other data on a write like this.
>>
>> On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson 
>> wrote:
>>
>>> Hi Sean
>>>
>>>
>>>
>>> My “insert” solution is hack that might work give we can easily spin up
>>> a single VM with a crazy amouts of memory. I would prefer to see a
>>> distributed solution. It is just a matter of time before someone want to
>>> create an even bigger table using cbind.
>>>
>>>
>>>
>>> I understand you probably already know a lot about traditional RDBS’s.
>>> Much of my post is back ground for others
>>>
>>>
>>>
>>> I used to do some of classic relational database work before tools like
>>> Hadoop, spark and NoSQL became available .
>>>
>>>
>>>
>>> The standard operations on a single table in a relation database are
>>>
>>>
>>>
>>> Insert “row”. This is similar to spark union.  Typically primary keys in
>>>  in rbdms tables are indexed  to enable quick look up. So insert is
>>> probably not 1 for. 1 with union. The row may not simply be appended to the
>>> end of the table.
>>>
>>>
>>>
>>> Update a “row”
>>>
>>> Delete a “row”
>>>
>>> Select “rows where”
>>>
>>>
>>>
>>> Rdms server enable row and table level locking. Data must always be in a
>>> consistent state. You must commit or abort you changes for them to persist
>>> and to release locks on the data. Locks are required because you have a
>>> single resource and may user requesting service simultaneously. This is
>>> very different from Spark
>>>
>>>
>>>
>>> Storage and memory used to be really expensive so often people tried to
>>> create “1st normal form” schemas. I.E. no duplicate data to reduce
>>> hardware cost.  1st normal design require you to use joins to the get
>>> data table you want. Joins are expensive. Often design duplicated some data
>>> to improve performance by minimize the number of joins required. Duplicate
>>> data make maintaining consistency harder. There are other advantages to
>>> normalized data design and as we are all aware in the bigdata world lots of
>>> disadvantages. The dbms ran on a single big machine. Join was not
>>> implemented as distributed map/reduce.
>>>
>>>
>>>
>>> So My idea is use a traditional RDMS server: my final table will have 5
>>> million rows and 10,114 columns.
>>>
>>>1. Read the column vector from each of 10,114 data files
>>>2. insert the column vector as a row in the table
>>>   1. I read a file that has a single element on each line. All I
>>>   need to do is replace \n with ,
>>>3. Now I have table with 10,115 rows and 5 million columns
>>>4. The row id (primary key) is the original file name
>>>5. The columns are the row ids in the original column vectors
>>>6. Now all I need to do is pivot this single table to get what I
>>>want. This is the only join or map/reduce like operation
>>>7. A table with 5million rows and 10,114 columns
>>>
>>>
>>>
>>>
>>>
>>> My final table is about 220 gb. I know at google my I have quota for up
>>> 2 mega mem machines. Each one has some think like 1.4 Tb of memory
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>>


Re: How is union() implemented? Need to implement column bind

2022-04-21 Thread Sonal Goyal
Seems like an interesting problem to solve!

If I have understood it correctly, you have 10114 files each with the
structure

rowid, colA
r1, a
r2, b
r3, c
...5 million rows

if you union them, you will have
rowid, colA, colB
r1, a, null
r2, b, null
r3, c, null
r1, null, d
r2, null, e
r3, null, f

Will a window partition by rowid and max on column values not work ?

Cheers,
Sonal
https://github.com/zinggAI/zingg



On Thu, Apr 21, 2022 at 6:50 AM Sean Owen  wrote:

> Oh, Spark directly supports upserts (with the right data destination) and
> yeah you could do this as 1+ updates to a table without any pivoting,
> etc. It'd still end up being 10K+ single joins along the way but individual
> steps are simpler. It might actually be pretty efficient I/O wise as
> columnar formats would not rewrite any other data on a write like this.
>
> On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson  wrote:
>
>> Hi Sean
>>
>>
>>
>> My “insert” solution is hack that might work give we can easily spin up a
>> single VM with a crazy amouts of memory. I would prefer to see a
>> distributed solution. It is just a matter of time before someone want to
>> create an even bigger table using cbind.
>>
>>
>>
>> I understand you probably already know a lot about traditional RDBS’s.
>> Much of my post is back ground for others
>>
>>
>>
>> I used to do some of classic relational database work before tools like
>> Hadoop, spark and NoSQL became available .
>>
>>
>>
>> The standard operations on a single table in a relation database are
>>
>>
>>
>> Insert “row”. This is similar to spark union.  Typically primary keys in
>>  in rbdms tables are indexed  to enable quick look up. So insert is
>> probably not 1 for. 1 with union. The row may not simply be appended to the
>> end of the table.
>>
>>
>>
>> Update a “row”
>>
>> Delete a “row”
>>
>> Select “rows where”
>>
>>
>>
>> Rdms server enable row and table level locking. Data must always be in a
>> consistent state. You must commit or abort you changes for them to persist
>> and to release locks on the data. Locks are required because you have a
>> single resource and may user requesting service simultaneously. This is
>> very different from Spark
>>
>>
>>
>> Storage and memory used to be really expensive so often people tried to
>> create “1st normal form” schemas. I.E. no duplicate data to reduce
>> hardware cost.  1st normal design require you to use joins to the get
>> data table you want. Joins are expensive. Often design duplicated some data
>> to improve performance by minimize the number of joins required. Duplicate
>> data make maintaining consistency harder. There are other advantages to
>> normalized data design and as we are all aware in the bigdata world lots of
>> disadvantages. The dbms ran on a single big machine. Join was not
>> implemented as distributed map/reduce.
>>
>>
>>
>> So My idea is use a traditional RDMS server: my final table will have 5
>> million rows and 10,114 columns.
>>
>>1. Read the column vector from each of 10,114 data files
>>2. insert the column vector as a row in the table
>>   1. I read a file that has a single element on each line. All I
>>   need to do is replace \n with ,
>>3. Now I have table with 10,115 rows and 5 million columns
>>4. The row id (primary key) is the original file name
>>5. The columns are the row ids in the original column vectors
>>6. Now all I need to do is pivot this single table to get what I
>>want. This is the only join or map/reduce like operation
>>7. A table with 5million rows and 10,114 columns
>>
>>
>>
>>
>>
>> My final table is about 220 gb. I know at google my I have quota for up 2
>> mega mem machines. Each one has some think like 1.4 Tb of memory
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>>
>>
>>


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
Oh, Spark directly supports upserts (with the right data destination) and
yeah you could do this as 1+ updates to a table without any pivoting,
etc. It'd still end up being 10K+ single joins along the way but individual
steps are simpler. It might actually be pretty efficient I/O wise as
columnar formats would not rewrite any other data on a write like this.

On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson  wrote:

> Hi Sean
>
>
>
> My “insert” solution is hack that might work give we can easily spin up a
> single VM with a crazy amouts of memory. I would prefer to see a
> distributed solution. It is just a matter of time before someone want to
> create an even bigger table using cbind.
>
>
>
> I understand you probably already know a lot about traditional RDBS’s.
> Much of my post is back ground for others
>
>
>
> I used to do some of classic relational database work before tools like
> Hadoop, spark and NoSQL became available .
>
>
>
> The standard operations on a single table in a relation database are
>
>
>
> Insert “row”. This is similar to spark union.  Typically primary keys in
>  in rbdms tables are indexed  to enable quick look up. So insert is
> probably not 1 for. 1 with union. The row may not simply be appended to the
> end of the table.
>
>
>
> Update a “row”
>
> Delete a “row”
>
> Select “rows where”
>
>
>
> Rdms server enable row and table level locking. Data must always be in a
> consistent state. You must commit or abort you changes for them to persist
> and to release locks on the data. Locks are required because you have a
> single resource and may user requesting service simultaneously. This is
> very different from Spark
>
>
>
> Storage and memory used to be really expensive so often people tried to
> create “1st normal form” schemas. I.E. no duplicate data to reduce
> hardware cost.  1st normal design require you to use joins to the get
> data table you want. Joins are expensive. Often design duplicated some data
> to improve performance by minimize the number of joins required. Duplicate
> data make maintaining consistency harder. There are other advantages to
> normalized data design and as we are all aware in the bigdata world lots of
> disadvantages. The dbms ran on a single big machine. Join was not
> implemented as distributed map/reduce.
>
>
>
> So My idea is use a traditional RDMS server: my final table will have 5
> million rows and 10,114 columns.
>
>1. Read the column vector from each of 10,114 data files
>2. insert the column vector as a row in the table
>   1. I read a file that has a single element on each line. All I need
>   to do is replace \n with ,
>3. Now I have table with 10,115 rows and 5 million columns
>4. The row id (primary key) is the original file name
>5. The columns are the row ids in the original column vectors
>6. Now all I need to do is pivot this single table to get what I want.
>This is the only join or map/reduce like operation
>7. A table with 5million rows and 10,114 columns
>
>
>
>
>
> My final table is about 220 gb. I know at google my I have quota for up 2
> mega mem machines. Each one has some think like 1.4 Tb of memory
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
>


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Davidson
Hi Sean

My “insert” solution is hack that might work give we can easily spin up a 
single VM with a crazy amouts of memory. I would prefer to see a distributed 
solution. It is just a matter of time before someone want to create an even 
bigger table using cbind.

I understand you probably already know a lot about traditional RDBS’s. Much of 
my post is back ground for others

I used to do some of classic relational database work before tools like Hadoop, 
spark and NoSQL became available .

The standard operations on a single table in a relation database are

Insert “row”. This is similar to spark union.  Typically primary keys in  in 
rbdms tables are indexed  to enable quick look up. So insert is probably not 1 
for. 1 with union. The row may not simply be appended to the end of the table.

Update a “row”
Delete a “row”
Select “rows where”

Rdms server enable row and table level locking. Data must always be in a 
consistent state. You must commit or abort you changes for them to persist and 
to release locks on the data. Locks are required because you have a single 
resource and may user requesting service simultaneously. This is very different 
from Spark

Storage and memory used to be really expensive so often people tried to create 
“1st normal form” schemas. I.E. no duplicate data to reduce hardware cost.  1st 
normal design require you to use joins to the get data table you want. Joins 
are expensive. Often design duplicated some data to improve performance by 
minimize the number of joins required. Duplicate data make maintaining 
consistency harder. There are other advantages to normalized data design and as 
we are all aware in the bigdata world lots of disadvantages. The dbms ran on a 
single big machine. Join was not implemented as distributed map/reduce.

So My idea is use a traditional RDMS server: my final table will have 5 million 
rows and 10,114 columns.

  1.  Read the column vector from each of 10,114 data files
  2.  insert the column vector as a row in the table
 *   I read a file that has a single element on each line. All I need to do 
is replace \n with ,
  3.  Now I have table with 10,115 rows and 5 million columns
  4.  The row id (primary key) is the original file name
  5.  The columns are the row ids in the original column vectors
  6.  Now all I need to do is pivot this single table to get what I want. This 
is the only join or map/reduce like operation
  7.  A table with 5million rows and 10,114 columns


My final table is about 220 gb. I know at google my I have quota for up 2 mega 
mem machines. Each one has some think like 1.4 Tb of memory

Kind regards

Andy


From: Sean Owen 
Date: Wednesday, April 20, 2022 at 5:34 PM
To: Andrew Davidson 
Cc: Andrew Melo , Bjørn Jørgensen 
, "user @spark" 
Subject: Re: How is union() implemented? Need to implement column bind

Wait, how is all that related to cbind -- very different from what's needed to 
insert.
BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can you 
express this in SQL without joins? I'm just guessing joining 10K+ tables is 
hard anywhere.

On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
I was thinking about something like bigQuery a little more. I do not know how 
it is implemented. However I believe traditional relational databases are row 
oriented and typically run on single machine. You can lock at the row level. 
This leads me to speculate that row level inserts maybe more efficient that the 
way spark implements union. One way to create my uber matrix would be to read 
the column vectors from the  10,114 individual files and insert them as rows in 
a table, then pivot the table.  I am going to poke around a bit. For all I know 
bigQuery use map reduce like spark.

Kind regards

Andy

From: Sean Owen mailto:sro...@gmail.com>>
Date: Wednesday, April 20, 2022 at 2:31 PM
To: Andrew Melo mailto:andrew.m...@gmail.com>>
Cc: Andrew Davidson mailto:aedav...@ucsc.edu>>, Bjørn 
Jørgensen mailto:bjornjorgen...@gmail.com>>, "user 
@spark" mailto:user@spark.apache.org>>
Subject: Re: How is union() implemented? Need to implement column bind

I don't think there's fundamental disapproval (it is implemented in sparklyr) 
just a question of how you make this work at scale in general. It's not a super 
natural operation in this context but can be done. If you find a successful 
solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo 
mailto:andrew.m...@gmail.com>> wrote:
It would certainly be useful for our domain to have some sort of native 
cbind(). Is there a fundamental disapproval of adding that functionality, or is 
it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen 
mailto:sro...@gmail.com>> wrote:
Good lead, pandas on Spark concat() is worth trying. It looks like it uses a 
join, but not 100% sure from the source.
Th

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
Wait, how is all that related to cbind -- very different from what's needed
to insert.
BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can
you express this in SQL without joins? I'm just guessing joining 10K+
tables is hard anywhere.

On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson  wrote:

> I was thinking about something like bigQuery a little more. I do not know
> how it is implemented. However I believe traditional relational databases
> are row oriented and typically run on single machine. You can lock at the
> row level. This leads me to speculate that row level inserts maybe more
> efficient that the way spark implements union. One way to create my uber
> matrix would be to read the column vectors from the  10,114 individual
> files and insert them as rows in a table, then pivot the table.  I am going
> to poke around a bit. For all I know bigQuery use map reduce like spark.
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *Sean Owen 
> *Date: *Wednesday, April 20, 2022 at 2:31 PM
> *To: *Andrew Melo 
> *Cc: *Andrew Davidson , Bjørn Jørgensen <
> bjornjorgen...@gmail.com>, "user @spark" 
> *Subject: *Re: How is union() implemented? Need to implement column bind
>
>
>
> I don't think there's fundamental disapproval (it is implemented in
> sparklyr) just a question of how you make this work at scale in general.
> It's not a super natural operation in this context but can be done. If you
> find a successful solution at extremes then maybe it generalizes.
>
>
>
> On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo  wrote:
>
> It would certainly be useful for our domain to have some sort of native
> cbind(). Is there a fundamental disapproval of adding that functionality,
> or is it just a matter of nobody implementing it?
>
>
>
> On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:
>
> Good lead, pandas on Spark concat() is worth trying. It looks like it uses
> a join, but not 100% sure from the source.
>
> The SQL concat() function is indeed a different thing.
>
>
>
> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
> wrote:
>
> Sorry for asking. But why does`t concat work?
>
>
>
> Pandas on spark have ps.concat
> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>  which
> takes 2 dataframes and concat them to 1 dataframe.
>
> It seems
> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
> like the pyspark version takes 2 columns and concat it to one column.
>
>
>
> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>
> cbind? yeah though the answer is typically a join. I don't know if there's
> a better option in a SQL engine, as SQL doesn't have anything to offer
> except join and pivot either (? right?)
>
> Certainly, the dominant data storage paradigm is wide tables, whereas
> you're starting with effectively a huge number of tiny slim tables, which
> is the impedance mismatch here.
>
>
>
> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson  wrote:
>
> Thanks Sean
>
>
>
> I imagine this is a fairly common problem in data science. Any idea how
> other solve?  For example I wonder if running join something like BigQuery
> might work better? I do not know much about the implementation.
>
>
>
> No one tool will  solve all problems. Once I get the matrix I think it
> spark will work well for our need
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *Sean Owen 
> *Date: *Monday, April 18, 2022 at 6:58 PM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How is union() implemented? Need to implement column bind
>
>
>
> A join is the natural answer, but this is a 10114-way join, which probably
> chokes readily just to even plan it, let alone all the shuffling and
> shuffling of huge data. You could tune your way out of it maybe, but not
> optimistic. It's just huge.
>
>
>
> You could go off-road and lower-level to take advantage of the structure
> of the data. You effectively want "column bind". There is no such operation
> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
> is in the RDD API, and to my surprise, not in the Python API but exists in
> Scala. And R (!). If you can read several RDDs of data, you can use this
> method to pair all their corresponding values and ultimately get rows of
> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>
>
>
> The issue I see is that you ca

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Davidson
I was thinking about something like bigQuery a little more. I do not know how 
it is implemented. However I believe traditional relational databases are row 
oriented and typically run on single machine. You can lock at the row level. 
This leads me to speculate that row level inserts maybe more efficient that the 
way spark implements union. One way to create my uber matrix would be to read 
the column vectors from the  10,114 individual files and insert them as rows in 
a table, then pivot the table.  I am going to poke around a bit. For all I know 
bigQuery use map reduce like spark.

Kind regards

Andy

From: Sean Owen 
Date: Wednesday, April 20, 2022 at 2:31 PM
To: Andrew Melo 
Cc: Andrew Davidson , Bjørn Jørgensen 
, "user @spark" 
Subject: Re: How is union() implemented? Need to implement column bind

I don't think there's fundamental disapproval (it is implemented in sparklyr) 
just a question of how you make this work at scale in general. It's not a super 
natural operation in this context but can be done. If you find a successful 
solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo 
mailto:andrew.m...@gmail.com>> wrote:
It would certainly be useful for our domain to have some sort of native 
cbind(). Is there a fundamental disapproval of adding that functionality, or is 
it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen 
mailto:sro...@gmail.com>> wrote:
Good lead, pandas on Spark concat() is worth trying. It looks like it uses a 
join, but not 100% sure from the source.
The SQL concat() function is indeed a different thing.

On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
mailto:bjornjorgen...@gmail.com>> wrote:
Sorry for asking. But why does`t concat work?

Pandas on spark have 
ps.concat<https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
 which takes 2 dataframes and concat them to 1 dataframe.
It 
seems<https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
 like the pyspark version takes 2 columns and concat it to one column.

ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen 
mailto:sro...@gmail.com>>:
cbind? yeah though the answer is typically a join. I don't know if there's a 
better option in a SQL engine, as SQL doesn't have anything to offer except 
join and pivot either (? right?)
Certainly, the dominant data storage paradigm is wide tables, whereas you're 
starting with effectively a huge number of tiny slim tables, which is the 
impedance mismatch here.

On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
mailto:aedav...@ucsc.edu>> wrote:
Thanks Sean

I imagine this is a fairly common problem in data science. Any idea how other 
solve?  For example I wonder if running join something like BigQuery might work 
better? I do not know much about the implementation.

No one tool will  solve all problems. Once I get the matrix I think it spark 
will work well for our need

Kind regards

Andy

From: Sean Owen mailto:sro...@gmail.com>>
Date: Monday, April 18, 2022 at 6:58 PM
To: Andrew Davidson mailto:aedav...@ucsc.edu>>
Cc: "user @spark" mailto:user@spark.apache.org>>
Subject: Re: How is union() implemented? Need to implement column bind

A join is the natural answer, but this is a 10114-way join, which probably 
chokes readily just to even plan it, let alone all the shuffling and shuffling 
of huge data. You could tune your way out of it maybe, but not optimistic. It's 
just huge.

You could go off-road and lower-level to take advantage of the structure of the 
data. You effectively want "column bind". There is no such operation in Spark. 
(union is 'row bind'.) You could do this with zipPartition, which is in the RDD 
API, and to my surprise, not in the Python API but exists in Scala. And R (!). 
If you can read several RDDs of data, you can use this method to pair all their 
corresponding values and ultimately get rows of 10114 values out. In fact that 
is how sparklyr implements cbind on Spark, FWIW: 
https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to zip 
10114 of them. Perhaps you have to do that iteratively, and I don't know if 
that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows (maybe 
list all the file names, parallelize with Spark, write a UDF that reads the 
data for that file to generate the rows). If you can emit (file, index, value) 
and groupBy index, pivot on file (I think?) that should be about it? I think it 
doesn't need additional hashing or whatever. Not sure how fast it is but that 
seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson  
wrote:
Hi have a hard problem

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
I don't think there's fundamental disapproval (it is implemented in
sparklyr) just a question of how you make this work at scale in general.
It's not a super natural operation in this context but can be done. If you
find a successful solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo  wrote:

> It would certainly be useful for our domain to have some sort of native
> cbind(). Is there a fundamental disapproval of adding that functionality,
> or is it just a matter of nobody implementing it?
>
> On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:
>
>> Good lead, pandas on Spark concat() is worth trying. It looks like it
>> uses a join, but not 100% sure from the source.
>> The SQL concat() function is indeed a different thing.
>>
>> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
>> wrote:
>>
>>> Sorry for asking. But why does`t concat work?
>>>
>>> Pandas on spark have ps.concat
>>> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>>>  which
>>> takes 2 dataframes and concat them to 1 dataframe.
>>> It seems
>>> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
>>> like the pyspark version takes 2 columns and concat it to one column.
>>>
>>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>>>
>>>> cbind? yeah though the answer is typically a join. I don't know if
>>>> there's a better option in a SQL engine, as SQL doesn't have anything to
>>>> offer except join and pivot either (? right?)
>>>> Certainly, the dominant data storage paradigm is wide tables, whereas
>>>> you're starting with effectively a huge number of tiny slim tables, which
>>>> is the impedance mismatch here.
>>>>
>>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>>>> wrote:
>>>>
>>>>> Thanks Sean
>>>>>
>>>>>
>>>>>
>>>>> I imagine this is a fairly common problem in data science. Any idea
>>>>> how other solve?  For example I wonder if running join something like
>>>>> BigQuery might work better? I do not know much about the implementation.
>>>>>
>>>>>
>>>>>
>>>>> No one tool will  solve all problems. Once I get the matrix I think it
>>>>> spark will work well for our need
>>>>>
>>>>>
>>>>>
>>>>> Kind regards
>>>>>
>>>>>
>>>>>
>>>>> Andy
>>>>>
>>>>>
>>>>>
>>>>> *From: *Sean Owen 
>>>>> *Date: *Monday, April 18, 2022 at 6:58 PM
>>>>> *To: *Andrew Davidson 
>>>>> *Cc: *"user @spark" 
>>>>> *Subject: *Re: How is union() implemented? Need to implement column
>>>>> bind
>>>>>
>>>>>
>>>>>
>>>>> A join is the natural answer, but this is a 10114-way join, which
>>>>> probably chokes readily just to even plan it, let alone all the shuffling
>>>>> and shuffling of huge data. You could tune your way out of it maybe, but
>>>>> not optimistic. It's just huge.
>>>>>
>>>>>
>>>>>
>>>>> You could go off-road and lower-level to take advantage of the
>>>>> structure of the data. You effectively want "column bind". There is no 
>>>>> such
>>>>> operation in Spark. (union is 'row bind'.) You could do this with
>>>>> zipPartition, which is in the RDD API, and to my surprise, not in the
>>>>> Python API but exists in Scala. And R (!). If you can read several RDDs of
>>>>> data, you can use this method to pair all their corresponding values and
>>>>> ultimately get rows of 10114 values out. In fact that is how sparklyr
>>>>> implements cbind on Spark, FWIW:
>>>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>>>>
>>>>>
>>>>>
>>>>> The issue I see is that you can only zip a few at a time; you don't
>>>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I
>>>>> don't know if that is going to face the same issues with huge huge plans.
>>>>>
>>>>>
>>>>>
>>>>> I 

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Melo
It would certainly be useful for our domain to have some sort of native
cbind(). Is there a fundamental disapproval of adding that functionality,
or is it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:

> Good lead, pandas on Spark concat() is worth trying. It looks like it uses
> a join, but not 100% sure from the source.
> The SQL concat() function is indeed a different thing.
>
> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
> wrote:
>
>> Sorry for asking. But why does`t concat work?
>>
>> Pandas on spark have ps.concat
>> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>>  which
>> takes 2 dataframes and concat them to 1 dataframe.
>> It seems
>> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
>> like the pyspark version takes 2 columns and concat it to one column.
>>
>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>>
>>> cbind? yeah though the answer is typically a join. I don't know if
>>> there's a better option in a SQL engine, as SQL doesn't have anything to
>>> offer except join and pivot either (? right?)
>>> Certainly, the dominant data storage paradigm is wide tables, whereas
>>> you're starting with effectively a huge number of tiny slim tables, which
>>> is the impedance mismatch here.
>>>
>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>>> wrote:
>>>
>>>> Thanks Sean
>>>>
>>>>
>>>>
>>>> I imagine this is a fairly common problem in data science. Any idea how
>>>> other solve?  For example I wonder if running join something like BigQuery
>>>> might work better? I do not know much about the implementation.
>>>>
>>>>
>>>>
>>>> No one tool will  solve all problems. Once I get the matrix I think it
>>>> spark will work well for our need
>>>>
>>>>
>>>>
>>>> Kind regards
>>>>
>>>>
>>>>
>>>> Andy
>>>>
>>>>
>>>>
>>>> *From: *Sean Owen 
>>>> *Date: *Monday, April 18, 2022 at 6:58 PM
>>>> *To: *Andrew Davidson 
>>>> *Cc: *"user @spark" 
>>>> *Subject: *Re: How is union() implemented? Need to implement column
>>>> bind
>>>>
>>>>
>>>>
>>>> A join is the natural answer, but this is a 10114-way join, which
>>>> probably chokes readily just to even plan it, let alone all the shuffling
>>>> and shuffling of huge data. You could tune your way out of it maybe, but
>>>> not optimistic. It's just huge.
>>>>
>>>>
>>>>
>>>> You could go off-road and lower-level to take advantage of the
>>>> structure of the data. You effectively want "column bind". There is no such
>>>> operation in Spark. (union is 'row bind'.) You could do this with
>>>> zipPartition, which is in the RDD API, and to my surprise, not in the
>>>> Python API but exists in Scala. And R (!). If you can read several RDDs of
>>>> data, you can use this method to pair all their corresponding values and
>>>> ultimately get rows of 10114 values out. In fact that is how sparklyr
>>>> implements cbind on Spark, FWIW:
>>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>>>
>>>>
>>>>
>>>> The issue I see is that you can only zip a few at a time; you don't
>>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I
>>>> don't know if that is going to face the same issues with huge huge plans.
>>>>
>>>>
>>>>
>>>> I like the pivot idea. If you can read the individual files as data
>>>> rows (maybe list all the file names, parallelize with Spark, write a UDF
>>>> that reads the data for that file to generate the rows). If you can emit
>>>> (file, index, value) and groupBy index, pivot on file (I think?) that
>>>> should be about it? I think it doesn't need additional hashing or whatever.
>>>> Not sure how fast it is but that seems more direct than the join, as well.
>>>>
>>>>
>>>>
>>>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson
>>>>  wrote:
>>>>
>>>> Hi have a hard problem
>>>

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
Good lead, pandas on Spark concat() is worth trying. It looks like it uses
a join, but not 100% sure from the source.
The SQL concat() function is indeed a different thing.

On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
wrote:

> Sorry for asking. But why does`t concat work?
>
> Pandas on spark have ps.concat
> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>  which
> takes 2 dataframes and concat them to 1 dataframe.
> It seems
> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
> like the pyspark version takes 2 columns and concat it to one column.
>
> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>
>> cbind? yeah though the answer is typically a join. I don't know if
>> there's a better option in a SQL engine, as SQL doesn't have anything to
>> offer except join and pivot either (? right?)
>> Certainly, the dominant data storage paradigm is wide tables, whereas
>> you're starting with effectively a huge number of tiny slim tables, which
>> is the impedance mismatch here.
>>
>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>> wrote:
>>
>>> Thanks Sean
>>>
>>>
>>>
>>> I imagine this is a fairly common problem in data science. Any idea how
>>> other solve?  For example I wonder if running join something like BigQuery
>>> might work better? I do not know much about the implementation.
>>>
>>>
>>>
>>> No one tool will  solve all problems. Once I get the matrix I think it
>>> spark will work well for our need
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> *From: *Sean Owen 
>>> *Date: *Monday, April 18, 2022 at 6:58 PM
>>> *To: *Andrew Davidson 
>>> *Cc: *"user @spark" 
>>> *Subject: *Re: How is union() implemented? Need to implement column bind
>>>
>>>
>>>
>>> A join is the natural answer, but this is a 10114-way join, which
>>> probably chokes readily just to even plan it, let alone all the shuffling
>>> and shuffling of huge data. You could tune your way out of it maybe, but
>>> not optimistic. It's just huge.
>>>
>>>
>>>
>>> You could go off-road and lower-level to take advantage of the structure
>>> of the data. You effectively want "column bind". There is no such operation
>>> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
>>> is in the RDD API, and to my surprise, not in the Python API but exists in
>>> Scala. And R (!). If you can read several RDDs of data, you can use this
>>> method to pair all their corresponding values and ultimately get rows of
>>> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
>>> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>>
>>>
>>>
>>> The issue I see is that you can only zip a few at a time; you don't want
>>> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't
>>> know if that is going to face the same issues with huge huge plans.
>>>
>>>
>>>
>>> I like the pivot idea. If you can read the individual files as data rows
>>> (maybe list all the file names, parallelize with Spark, write a UDF that
>>> reads the data for that file to generate the rows). If you can emit (file,
>>> index, value) and groupBy index, pivot on file (I think?) that should be
>>> about it? I think it doesn't need additional hashing or whatever. Not sure
>>> how fast it is but that seems more direct than the join, as well.
>>>
>>>
>>>
>>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson
>>>  wrote:
>>>
>>> Hi have a hard problem
>>>
>>>
>>>
>>> I have  10114 column vectors each in a separate file. The file has 2
>>> columns, the row id, and numeric values. The row ids are identical and in
>>> sort order. All the column vectors have the same number of rows. There are
>>> over 5 million rows.  I need to combine them into a single table. The row
>>> ids are very long strings. The column names are about 20 chars long.
>>>
>>>
>>>
>>> My current implementation uses join. This takes a long time on a
>>> cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often
>>> crashes. I 

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Bjørn Jørgensen
Sorry for asking. But why does`t concat work?

Pandas on spark have ps.concat
<https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
which
takes 2 dataframes and concat them to 1 dataframe.
It seems
<https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
like the pyspark version takes 2 columns and concat it to one column.

ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :

> cbind? yeah though the answer is typically a join. I don't know if there's
> a better option in a SQL engine, as SQL doesn't have anything to offer
> except join and pivot either (? right?)
> Certainly, the dominant data storage paradigm is wide tables, whereas
> you're starting with effectively a huge number of tiny slim tables, which
> is the impedance mismatch here.
>
> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson  wrote:
>
>> Thanks Sean
>>
>>
>>
>> I imagine this is a fairly common problem in data science. Any idea how
>> other solve?  For example I wonder if running join something like BigQuery
>> might work better? I do not know much about the implementation.
>>
>>
>>
>> No one tool will  solve all problems. Once I get the matrix I think it
>> spark will work well for our need
>>
>>
>>
>> Kind regards
>>
>>
>>
>> Andy
>>
>>
>>
>> *From: *Sean Owen 
>> *Date: *Monday, April 18, 2022 at 6:58 PM
>> *To: *Andrew Davidson 
>> *Cc: *"user @spark" 
>> *Subject: *Re: How is union() implemented? Need to implement column bind
>>
>>
>>
>> A join is the natural answer, but this is a 10114-way join, which
>> probably chokes readily just to even plan it, let alone all the shuffling
>> and shuffling of huge data. You could tune your way out of it maybe, but
>> not optimistic. It's just huge.
>>
>>
>>
>> You could go off-road and lower-level to take advantage of the structure
>> of the data. You effectively want "column bind". There is no such operation
>> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
>> is in the RDD API, and to my surprise, not in the Python API but exists in
>> Scala. And R (!). If you can read several RDDs of data, you can use this
>> method to pair all their corresponding values and ultimately get rows of
>> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
>> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>
>>
>>
>> The issue I see is that you can only zip a few at a time; you don't want
>> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't
>> know if that is going to face the same issues with huge huge plans.
>>
>>
>>
>> I like the pivot idea. If you can read the individual files as data rows
>> (maybe list all the file names, parallelize with Spark, write a UDF that
>> reads the data for that file to generate the rows). If you can emit (file,
>> index, value) and groupBy index, pivot on file (I think?) that should be
>> about it? I think it doesn't need additional hashing or whatever. Not sure
>> how fast it is but that seems more direct than the join, as well.
>>
>>
>>
>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson 
>> wrote:
>>
>> Hi have a hard problem
>>
>>
>>
>> I have  10114 column vectors each in a separate file. The file has 2
>> columns, the row id, and numeric values. The row ids are identical and in
>> sort order. All the column vectors have the same number of rows. There are
>> over 5 million rows.  I need to combine them into a single table. The row
>> ids are very long strings. The column names are about 20 chars long.
>>
>>
>>
>> My current implementation uses join. This takes a long time on a cluster
>> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
>> mean totally dead start over. Checkpoints do not seem  help, It still
>> crashes and need to be restarted from scratch. What is really surprising
>> is the final file size is only 213G ! The way got the file  was to copy
>> all the column vectors to a single BIG IRON machine and used unix cut and
>> paste. Took about 44 min to run once I got all the data moved around. It
>> was very tedious and error prone. I had to move a lot data around. Not a
>> particularly reproducible process. I will need to rerun this three more
>> times on different data sets of about the same size
>>
>>
>>
>> I notic

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
cbind? yeah though the answer is typically a join. I don't know if there's
a better option in a SQL engine, as SQL doesn't have anything to offer
except join and pivot either (? right?)
Certainly, the dominant data storage paradigm is wide tables, whereas
you're starting with effectively a huge number of tiny slim tables, which
is the impedance mismatch here.

On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson  wrote:

> Thanks Sean
>
>
>
> I imagine this is a fairly common problem in data science. Any idea how
> other solve?  For example I wonder if running join something like BigQuery
> might work better? I do not know much about the implementation.
>
>
>
> No one tool will  solve all problems. Once I get the matrix I think it
> spark will work well for our need
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *Sean Owen 
> *Date: *Monday, April 18, 2022 at 6:58 PM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How is union() implemented? Need to implement column bind
>
>
>
> A join is the natural answer, but this is a 10114-way join, which probably
> chokes readily just to even plan it, let alone all the shuffling and
> shuffling of huge data. You could tune your way out of it maybe, but not
> optimistic. It's just huge.
>
>
>
> You could go off-road and lower-level to take advantage of the structure
> of the data. You effectively want "column bind". There is no such operation
> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
> is in the RDD API, and to my surprise, not in the Python API but exists in
> Scala. And R (!). If you can read several RDDs of data, you can use this
> method to pair all their corresponding values and ultimately get rows of
> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>
>
>
> The issue I see is that you can only zip a few at a time; you don't want
> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't
> know if that is going to face the same issues with huge huge plans.
>
>
>
> I like the pivot idea. If you can read the individual files as data rows
> (maybe list all the file names, parallelize with Spark, write a UDF that
> reads the data for that file to generate the rows). If you can emit (file,
> index, value) and groupBy index, pivot on file (I think?) that should be
> about it? I think it doesn't need additional hashing or whatever. Not sure
> how fast it is but that seems more direct than the join, as well.
>
>
>
> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson 
> wrote:
>
> Hi have a hard problem
>
>
>
> I have  10114 column vectors each in a separate file. The file has 2
> columns, the row id, and numeric values. The row ids are identical and in
> sort order. All the column vectors have the same number of rows. There are
> over 5 million rows.  I need to combine them into a single table. The row
> ids are very long strings. The column names are about 20 chars long.
>
>
>
> My current implementation uses join. This takes a long time on a cluster
> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
> mean totally dead start over. Checkpoints do not seem  help, It still
> crashes and need to be restarted from scratch. What is really surprising
> is the final file size is only 213G ! The way got the file  was to copy
> all the column vectors to a single BIG IRON machine and used unix cut and
> paste. Took about 44 min to run once I got all the data moved around. It
> was very tedious and error prone. I had to move a lot data around. Not a
> particularly reproducible process. I will need to rerun this three more
> times on different data sets of about the same size
>
>
>
> I noticed that spark has a union function(). It implements row bind. Any
> idea how it is implemented? Is it just map reduce under the covers?
>
>
>
> My thought was
>
> 1.  load each col vector
>
> 2.  maybe I need to replace the really long row id strings with
> integers
>
> 3.  convert column vectors into row vectors using piviot (Ie matrix
> transpose.)
>
> 4.  union all the row vectors into a single table
>
> 5.  piviot the table back so I have the correct column vectors
>
>
>
> I could replace the row ids and column name with integers if needed, and
> restore them later
>
>
>
> Maybe I would be better off using many small machines? I assume memory is
> the limiting resource not cpu. I notice that memory usage will reach 100%.
> I added several TB’s of local ssd. I am not convinced that spark is using
>

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Davidson
Thanks Sean

I imagine this is a fairly common problem in data science. Any idea how other 
solve?  For example I wonder if running join something like BigQuery might work 
better? I do not know much about the implementation.

No one tool will  solve all problems. Once I get the matrix I think it spark 
will work well for our need

Kind regards

Andy

From: Sean Owen 
Date: Monday, April 18, 2022 at 6:58 PM
To: Andrew Davidson 
Cc: "user @spark" 
Subject: Re: How is union() implemented? Need to implement column bind

A join is the natural answer, but this is a 10114-way join, which probably 
chokes readily just to even plan it, let alone all the shuffling and shuffling 
of huge data. You could tune your way out of it maybe, but not optimistic. It's 
just huge.

You could go off-road and lower-level to take advantage of the structure of the 
data. You effectively want "column bind". There is no such operation in Spark. 
(union is 'row bind'.) You could do this with zipPartition, which is in the RDD 
API, and to my surprise, not in the Python API but exists in Scala. And R (!). 
If you can read several RDDs of data, you can use this method to pair all their 
corresponding values and ultimately get rows of 10114 values out. In fact that 
is how sparklyr implements cbind on Spark, FWIW: 
https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to zip 
10114 of them. Perhaps you have to do that iteratively, and I don't know if 
that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows (maybe 
list all the file names, parallelize with Spark, write a UDF that reads the 
data for that file to generate the rows). If you can emit (file, index, value) 
and groupBy index, pivot on file (I think?) that should be about it? I think it 
doesn't need additional hashing or whatever. Not sure how fast it is but that 
seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson  
wrote:
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 
size

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

1.  load each col vector

2.  maybe I need to replace the really long row id strings with integers

3.  convert column vectors into row vectors using piviot (Ie matrix 
transpose.)

4.  union all the row vectors into a single table

5.  piviot the table back so I have the correct column vectors


I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk


will this perform better than join?


· The rows  before the final pivot will be very very wide (over 5 million 
columns)

· There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once



Comments and suggestions appreciated

Andy




Re: How is union() implemented? Need to implement column bind

2022-04-18 Thread Sean Owen
A join is the natural answer, but this is a 10114-way join, which probably
chokes readily just to even plan it, let alone all the shuffling and
shuffling of huge data. You could tune your way out of it maybe, but not
optimistic. It's just huge.

You could go off-road and lower-level to take advantage of the structure of
the data. You effectively want "column bind". There is no such operation in
Spark. (union is 'row bind'.) You could do this with zipPartition, which is
in the RDD API, and to my surprise, not in the Python API but exists in
Scala. And R (!). If you can read several RDDs of data, you can use this
method to pair all their corresponding values and ultimately get rows of
10114 values out. In fact that is how sparklyr implements cbind on Spark,
FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to
zip 10114 of them. Perhaps you have to do that iteratively, and I don't
know if that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows
(maybe list all the file names, parallelize with Spark, write a UDF that
reads the data for that file to generate the rows). If you can emit (file,
index, value) and groupBy index, pivot on file (I think?) that should be
about it? I think it doesn't need additional hashing or whatever. Not sure
how fast it is but that seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson 
wrote:

> Hi have a hard problem
>
>
>
> I have  10114 column vectors each in a separate file. The file has 2
> columns, the row id, and numeric values. The row ids are identical and in
> sort order. All the column vectors have the same number of rows. There are
> over 5 million rows.  I need to combine them into a single table. The row
> ids are very long strings. The column names are about 20 chars long.
>
>
>
> My current implementation uses join. This takes a long time on a cluster
> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
> mean totally dead start over. Checkpoints do not seem  help, It still
> crashes and need to be restarted from scratch. What is really surprising
> is the final file size is only 213G ! The way got the file  was to copy
> all the column vectors to a single BIG IRON machine and used unix cut and
> paste. Took about 44 min to run once I got all the data moved around. It
> was very tedious and error prone. I had to move a lot data around. Not a
> particularly reproducible process. I will need to rerun this three more
> times on different data sets of about the same size
>
>
>
> I noticed that spark has a union function(). It implements row bind. Any
> idea how it is implemented? Is it just map reduce under the covers?
>
>
>
> My thought was
>
>1. load each col vector
>2. maybe I need to replace the really long row id strings with integers
>3. convert column vectors into row vectors using piviot (Ie matrix
>transpose.)
>4. union all the row vectors into a single table
>5. piviot the table back so I have the correct column vectors
>
>
>
> I could replace the row ids and column name with integers if needed, and
> restore them later
>
>
>
> Maybe I would be better off using many small machines? I assume memory is
> the limiting resource not cpu. I notice that memory usage will reach 100%.
> I added several TB’s of local ssd. I am not convinced that spark is using
> the local disk
>
>
>
>
>
> will this perform better than join?
>
>
>
>- The rows  before the final pivot will be very very wide (over 5
>million columns)
>- There will only be 10114 rows before the pivot
>
>
>
> I assume the pivots will shuffle all the data. I assume the Colum vectors
> are trivial. The file table pivot will be expensive however will only need
> to be done once
>
>
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>


How is union() implemented? Need to implement column bind

2022-04-18 Thread Andrew Davidson
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 
size

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

  1.  load each col vector
  2.  maybe I need to replace the really long row id strings with integers
  3.  convert column vectors into row vectors using piviot (Ie matrix 
transpose.)
  4.  union all the row vectors into a single table
  5.  piviot the table back so I have the correct column vectors


I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk


will this perform better than join?


  *   The rows  before the final pivot will be very very wide (over 5 million 
columns)
  *   There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once



Comments and suggestions appreciated

Andy