Re: [Spark Core] Vectorizing very high-dimensional data sourced in long format

2020-10-30 Thread Daniel Chalef
Yes, the resulting matrix would be sparse. Thanks for the suggestion. Will
explore ways of doing this using an agg and UDF.

On Fri, Oct 30, 2020 at 6:26 AM Patrick McCarthy
 wrote:

> That's a very large vector. Is it sparse? Perhaps you'd have better luck
> performing an aggregate instead of a pivot, and assembling the vector using
> a UDF.
>
> On Thu, Oct 29, 2020 at 10:19 PM Daniel Chalef
>  wrote:
>
>> Hello,
>>
>> I have a very large long-format dataframe (several billion rows) that I'd
>> like to pivot and vectorize (using the VectorAssembler), with the aim to
>> reduce dimensionality using something akin to TF-IDF. Once pivoted, the
>> dataframe will have ~130 million columns.
>>
>> The source, long-format schema looks as follows:
>>
>> root
>>  |-- entity_id: long (nullable = false)
>>  |-- attribute_id: long (nullable = false)
>>  |-- event_count: integer (nullable = true)
>>
>> Pivoting as per the following fails, exhausting executor and driver
>> memory. I am unsure whether increasing memory limits would be successful
>> here as my sense is that pivoting and then using a VectorAssembler isn't
>> the right approach to solving this problem.
>>
>> wide_frame = (
>> long_frame.groupBy("entity_id")
>> .pivot("attribute_id")
>> .agg(F.first("event_count"))
>> )
>>
>> Are there other Spark patterns that I should attempt in order to achieve
>> my end goal of a vector of attributes for every entity?
>>
>> Thanks, Daniel
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Re: Debugging tools for Spark Structured Streaming

2020-10-30 Thread Artemis User
Spark distribute loads to executors and the executors are usually 
pre-configured with the number of cores.  You may want to check with 
your Spark admin on how many executors (or slaves) your Spark cluster is 
configured with and how many cores are pre-configured for executors.  
The debugging tool for performance tuning in Spark would be the built-in 
Web UI.


The level of parallel processing in structured streaming isn't as 
straightforward as standard ETL processing.  It depends on the data 
source, streaming mode (continuous or microbatch), your trigger timing, 
etc.  We have experienced similar scaling problems with structured 
streaming.  Please note that Spark is designed for processing large data 
chunks, not for streaming type of data one piece at a time.  It doesn't 
like small piece of data (the default partition size is set to 128 MB), 
period!  The partition mechanism and its RDD-driven DAG Job scheduler 
are all designed for processing large-scale data for ETL.  It has to 
accumulate streaming data into a large chunk first, before scaling can 
take place.  Apparently Spark can't distribute the read operation either 
(only one worker, and it has to do with preserving the order of stream 
data).  So your data ingestion becomes a bottleneck that prevents from 
scaling down the chain.   The alternatives may be to look into other 
streaming frameworks, like Apache Ignite..


-- ND

On 10/29/20 8:02 PM, Eric Beabes wrote:
We're using Spark 2.4. We recently pushed to production a product 
that's using Spark Structured Streaming. It's working well most of the 
time but occasionally, when the load is high, we've noticed that there 
are only 10+ 'Active Tasks' even though we've provided 128 cores. 
Would like to debug this further. Why are all the Cores not getting 
used? How do we debug this? Please help. Thanks.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Core] Vectorizing very high-dimensional data sourced in long format

2020-10-30 Thread Patrick McCarthy
That's a very large vector. Is it sparse? Perhaps you'd have better luck
performing an aggregate instead of a pivot, and assembling the vector using
a UDF.

On Thu, Oct 29, 2020 at 10:19 PM Daniel Chalef
 wrote:

> Hello,
>
> I have a very large long-format dataframe (several billion rows) that I'd
> like to pivot and vectorize (using the VectorAssembler), with the aim to
> reduce dimensionality using something akin to TF-IDF. Once pivoted, the
> dataframe will have ~130 million columns.
>
> The source, long-format schema looks as follows:
>
> root
>  |-- entity_id: long (nullable = false)
>  |-- attribute_id: long (nullable = false)
>  |-- event_count: integer (nullable = true)
>
> Pivoting as per the following fails, exhausting executor and driver
> memory. I am unsure whether increasing memory limits would be successful
> here as my sense is that pivoting and then using a VectorAssembler isn't
> the right approach to solving this problem.
>
> wide_frame = (
> long_frame.groupBy("entity_id")
> .pivot("attribute_id")
> .agg(F.first("event_count"))
> )
>
> Are there other Spark patterns that I should attempt in order to achieve
> my end goal of a vector of attributes for every entity?
>
> Thanks, Daniel
>


-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016