I don't think there's fundamental disapproval (it is implemented in
sparklyr) just a question of how you make this work at scale in general.
It's not a super natural operation in this context but can be done. If you
find a successful solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo <andrew.m...@gmail.com> wrote:

> It would certainly be useful for our domain to have some sort of native
> cbind(). Is there a fundamental disapproval of adding that functionality,
> or is it just a matter of nobody implementing it?
>
> On Wed, Apr 20, 2022 at 16:28 Sean Owen <sro...@gmail.com> wrote:
>
>> Good lead, pandas on Spark concat() is worth trying. It looks like it
>> uses a join, but not 100% sure from the source.
>> The SQL concat() function is indeed a different thing.
>>
>> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen <bjornjorgen...@gmail.com>
>> wrote:
>>
>>> Sorry for asking. But why does`t concat work?
>>>
>>> Pandas on spark have ps.concat
>>> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>>>  which
>>> takes 2 dataframes and concat them to 1 dataframe.
>>> It seems
>>> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
>>> like the pyspark version takes 2 columns and concat it to one column.
>>>
>>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen <sro...@gmail.com>:
>>>
>>>> cbind? yeah though the answer is typically a join. I don't know if
>>>> there's a better option in a SQL engine, as SQL doesn't have anything to
>>>> offer except join and pivot either (? right?)
>>>> Certainly, the dominant data storage paradigm is wide tables, whereas
>>>> you're starting with effectively a huge number of tiny slim tables, which
>>>> is the impedance mismatch here.
>>>>
>>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson <aedav...@ucsc.edu>
>>>> wrote:
>>>>
>>>>> Thanks Sean
>>>>>
>>>>>
>>>>>
>>>>> I imagine this is a fairly common problem in data science. Any idea
>>>>> how other solve?  For example I wonder if running join something like
>>>>> BigQuery might work better? I do not know much about the implementation.
>>>>>
>>>>>
>>>>>
>>>>> No one tool will  solve all problems. Once I get the matrix I think it
>>>>> spark will work well for our need
>>>>>
>>>>>
>>>>>
>>>>> Kind regards
>>>>>
>>>>>
>>>>>
>>>>> Andy
>>>>>
>>>>>
>>>>>
>>>>> *From: *Sean Owen <sro...@gmail.com>
>>>>> *Date: *Monday, April 18, 2022 at 6:58 PM
>>>>> *To: *Andrew Davidson <aedav...@ucsc.edu>
>>>>> *Cc: *"user @spark" <user@spark.apache.org>
>>>>> *Subject: *Re: How is union() implemented? Need to implement column
>>>>> bind
>>>>>
>>>>>
>>>>>
>>>>> A join is the natural answer, but this is a 10114-way join, which
>>>>> probably chokes readily just to even plan it, let alone all the shuffling
>>>>> and shuffling of huge data. You could tune your way out of it maybe, but
>>>>> not optimistic. It's just huge.
>>>>>
>>>>>
>>>>>
>>>>> You could go off-road and lower-level to take advantage of the
>>>>> structure of the data. You effectively want "column bind". There is no 
>>>>> such
>>>>> operation in Spark. (union is 'row bind'.) You could do this with
>>>>> zipPartition, which is in the RDD API, and to my surprise, not in the
>>>>> Python API but exists in Scala. And R (!). If you can read several RDDs of
>>>>> data, you can use this method to pair all their corresponding values and
>>>>> ultimately get rows of 10114 values out. In fact that is how sparklyr
>>>>> implements cbind on Spark, FWIW:
>>>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>>>>
>>>>>
>>>>>
>>>>> The issue I see is that you can only zip a few at a time; you don't
>>>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I
>>>>> don't know if that is going to face the same issues with huge huge plans.
>>>>>
>>>>>
>>>>>
>>>>> I like the pivot idea. If you can read the individual files as data
>>>>> rows (maybe list all the file names, parallelize with Spark, write a UDF
>>>>> that reads the data for that file to generate the rows). If you can emit
>>>>> (file, index, value) and groupBy index, pivot on file (I think?) that
>>>>> should be about it? I think it doesn't need additional hashing or 
>>>>> whatever.
>>>>> Not sure how fast it is but that seems more direct than the join, as well.
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson
>>>>> <aedav...@ucsc.edu.invalid> wrote:
>>>>>
>>>>> Hi have a hard problem
>>>>>
>>>>>
>>>>>
>>>>> I have  10114 column vectors each in a separate file. The file has 2
>>>>> columns, the row id, and numeric values. The row ids are identical and in
>>>>> sort order. All the column vectors have the same number of rows. There are
>>>>> over 5 million rows.  I need to combine them into a single table. The row
>>>>> ids are very long strings. The column names are about 20 chars long.
>>>>>
>>>>>
>>>>>
>>>>> My current implementation uses join. This takes a long time on a
>>>>> cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often
>>>>> crashes. I mean totally dead start over. Checkpoints do not seem  help, It
>>>>> still crashes and need to be restarted from scratch. What is really
>>>>> surprising is the final file size is only 213G ! The way got the file
>>>>>  was to copy all the column vectors to a single BIG IRON machine and used
>>>>> unix cut and paste. Took about 44 min to run once I got all the data moved
>>>>> around. It was very tedious and error prone. I had to move a lot data
>>>>> around. Not a particularly reproducible process. I will need to rerun
>>>>> this three more times on different data sets of about the same size
>>>>>
>>>>>
>>>>>
>>>>> I noticed that spark has a union function(). It implements row bind.
>>>>> Any idea how it is implemented? Is it just map reduce under the covers?
>>>>>
>>>>>
>>>>>
>>>>> My thought was
>>>>>
>>>>> 1.      load each col vector
>>>>>
>>>>> 2.      maybe I need to replace the really long row id strings with
>>>>> integers
>>>>>
>>>>> 3.      convert column vectors into row vectors using piviot (Ie
>>>>> matrix transpose.)
>>>>>
>>>>> 4.      union all the row vectors into a single table
>>>>>
>>>>> 5.      piviot the table back so I have the correct column vectors
>>>>>
>>>>>
>>>>>
>>>>> I could replace the row ids and column name with integers if needed,
>>>>> and restore them later
>>>>>
>>>>>
>>>>>
>>>>> Maybe I would be better off using many small machines? I assume memory
>>>>> is the limiting resource not cpu. I notice that memory usage will reach
>>>>> 100%. I added several TB’s of local ssd. I am not convinced that spark is
>>>>> using the local disk
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> will this perform better than join?
>>>>>
>>>>>
>>>>>
>>>>> · The rows  before the final pivot will be very very wide (over 5
>>>>> million columns)
>>>>>
>>>>> · There will only be 10114 rows before the pivot
>>>>>
>>>>>
>>>>>
>>>>> I assume the pivots will shuffle all the data. I assume the Colum
>>>>> vectors are trivial. The file table pivot will be expensive however will
>>>>> only need to be done once
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Comments and suggestions appreciated
>>>>>
>>>>>
>>>>>
>>>>> Andy
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4
>>> <https://www.google.com/maps/search/Vestre+Aspehaug+4?entry=gmail&source=g>,
>>> 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>> --
> It's dark in this basement.
>

Reply via email to