Re: How to connect Tableau to databricks spark?

2017-01-08 Thread Jörn Franke
Firewall Ports open? 
Hint: for security reasons you should not connect via the internet.

> On 9 Jan 2017, at 04:30, Raymond Xie  wrote:
> 
> I want to do some data analytics work by leveraging Databricks spark platform 
> and connect my Tableau desktop to it for data visualization.
> 
> Does anyone ever make it? I've trying to follow the instruction below but not 
> successful?
> 
> https://docs.cloud.databricks.com/docs/latest/databricks_guide/01%20Databricks%20Overview/14%20Third%20Party%20Integrations/01%20Setup%20JDBC%20or%20ODBC.html
> 
> 
> I got an error message in Tableau's attempt to connect:
> 
> Unable to connect to the server 
> "ec2-35-160-128-113.us-west-2.compute.amazonaws.com". Check that the server 
> is running and that you have access privileges to the requested database.
> 
> "ec2-35-160-128-113.us-west-2.compute.amazonaws.com" is the hostname of a EC2 
> instance I just created on AWS, I may have some missing there though as I am 
> new to AWS. 
> 
> I am not sure that is related to account issue, I was using my Databricks 
> account in Tableau to connect it.
> 
> Thank you very much. Any clue is appreciated.
> 
> 
> Sincerely yours,
> 
> 
> Raymond


Re: Unable to explain the job kicked off for spark.read.csv

2017-01-08 Thread Hyukjin Kwon
Oh, I mean another job would *not* happen if the schema is explicitly given.

2017-01-09 16:37 GMT+09:00 Hyukjin Kwon :

> Hi Appu,
>
>
> I believe that textFile and filter came from...
>
> https://github.com/apache/spark/blob/branch-2.1/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> datasources/csv/CSVFileFormat.scala#L59-L61
>
>
> It needs to read a first line even if using the header is disabled and
> schema inference is disabled because we need anyway need a default string
> schema
>
> which having the number of fields same with the first row, "_c#" where #
> is its position of fields if the schema is not specified manually.
>
> I believe that another job would happen if the schema is explicitly given
>
>
> I hope this is helpful
>
>
> Thanks.
>
> 2017-01-09 0:11 GMT+09:00 Appu K :
>
>> I was trying to create a base-data-frame in an EMR cluster from a csv
>> file using
>>
>> val baseDF = spark.read.csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-
>> tsv”)
>>
>> Omitted the options to infer the schema and specify the header, just to
>> understand what happens behind the screen.
>>
>>
>> The Spark UI shows that this kicked off a job with one stage.The stage
>> shows that a filter was applied
>>
>> Got curious a little bit about this. Is there any place where i could
>> better understand why a filter was applied here and why there was an action
>> in this case
>>
>>
>> thanks
>>
>>
>


Re: Unable to explain the job kicked off for spark.read.csv

2017-01-08 Thread Hyukjin Kwon
Hi Appu,


I believe that textFile and filter came from...

https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala#L59-L61


It needs to read a first line even if using the header is disabled and
schema inference is disabled because we need anyway need a default string
schema

which having the number of fields same with the first row, "_c#" where # is
its position of fields if the schema is not specified manually.

I believe that another job would happen if the schema is explicitly given


I hope this is helpful


Thanks.

2017-01-09 0:11 GMT+09:00 Appu K :

> I was trying to create a base-data-frame in an EMR cluster from a csv file
> using
>
> val baseDF = spark.read.csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-
> tsv”)
>
> Omitted the options to infer the schema and specify the header, just to
> understand what happens behind the screen.
>
>
> The Spark UI shows that this kicked off a job with one stage.The stage
> shows that a filter was applied
>
> Got curious a little bit about this. Is there any place where i could
> better understand why a filter was applied here and why there was an action
> in this case
>
>
> thanks
>
>


Re: Docker image for Spark streaming app

2017-01-08 Thread shyla deshpande
Just want to clarify.

I want to run a single node spark cluster in a docker container. I want to
use Spark 2.0+ and scala.
Looking for a docker image from docker hub. I want this setup for
development and testing purpose.

Please let me know if you know of any docker image that can help me get
started.

Thanks

On Sun, Jan 8, 2017 at 1:52 PM, shyla deshpande 
wrote:

> Thanks really appreciate.
>
> On Sun, Jan 8, 2017 at 1:02 PM, vvshvv  wrote:
>
>> Hi,
>>
>> I am running spark streaming job using spark jobserver via this image:
>>
>> https://hub.docker.com/r/depend/spark-jobserver/.
>>
>> It works well in standalone (using mesos job does not make progress).
>> Spark jobserver that supports Spark 2.0 has new API that is only suitable
>> for non-streaming jobs as for streaming you have to specify
>> StreamingContextFactory that still uses old API, so you can just use old
>> API for now.
>>
>>
>>
>> Sent from my Mi phone
>> On shyla deshpande , Jan 8, 2017 11:51 PM
>> wrote:
>>
>> I looking for a docker image that I can use from docker hub for running a
>> spark streaming app with scala and spark 2.0 +.
>>
>> I am new to docker and unable to find one image from docker hub that
>> suits my needs. Please let me know if anyone is using a docker for spark
>> streaming app and share your experience.
>>
>> Thanks
>>
>>
>


Re: handling of empty partitions

2017-01-08 Thread Holden Karau
Hi Georg,

Thanks for the question along with the code (as well as posting to stack
overflow). In general if a question is well suited for stackoverflow its
probably better suited to the user@ list instead of the dev@ list so I've
cc'd the user@ list for you.

As far as handling empty partitions when working mapPartitions (and
similar), the general approach is to return an empty iterator of the
correct type when you have an empty input iterator.

It looks like your code is doing this, however it seems like you likely
have a bug in your application logic (namely it assumes that if a partition
has a record missing a value it will either have had a previous row in the
same partition which is good OR that the previous partition is not empty
and has a good row - which need not necessarily be the case). You've
partially fixed this problem by going through and for each partition
collecting the last previous good value, and then if you don't have a good
value at the start of a partition look up the value in the collected array.

However, if this also happens at the same time the previous partition is
empty, you will need to go and lookup the previous previous partition value
until you find the one you are looking for. (Note this assumes that the
first record in your dataset is valid, if it isn't your code will still
fail).

Your solution is really close to working but just has some minor
assumptions which don't always necessarily hold.

Cheers,

Holden :)

On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh  wrote:

>
> Hi Georg,
>
> Can you describe your question more clear?
>
> Actually, the example codes you posted in stackoverflow doesn't crash as
> you
> said in the post.
>
>
> geoHeil wrote
> > I am working on building a custom ML pipeline-model / estimator to impute
> > missing values, e.g. I want to fill with last good known value.
> > Using a window function is slow / will put the data into a single
> > partition.
> > I built some sample code to use the RDD API however, it some None / null
> > problems with empty partitions.
> >
> > How should this be implemented properly to handle such empty partitions?
> > http://stackoverflow.com/questions/41474175/spark-
> mappartitionswithindex-handling-empty-partitions
> >
> > Kind regards,
> > Georg
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/handling-of-empty-
> partitions-tp20496p20515.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Efficient look up in Key Pair RDD

2017-01-08 Thread ayan guha
It is a hive construct, supported since hive 0.10, so I would be very
surprised if Spark does not support itcan't speak for Spark 2.0 (not
got a chance to touch it yet :) )

On Mon, Jan 9, 2017 at 2:33 PM, Anil Langote 
wrote:

> Does it support in Spark Dataset 2.0 ?
>
>
>
>
>
> Thank you
>
> Anil Langote
>
> +1-425-633-9747 <+1%20425-633-9747>
>
>
>
>
>
> *From: *ayan guha 
> *Date: *Sunday, January 8, 2017 at 10:32 PM
> *To: *Anil Langote 
>
> *Subject: *Re: Efficient look up in Key Pair RDD
>
>
>
> Hi
>
>
>
> Please have a look in this wiki
> .
> Grouping Set is a variation of GROUP BY where you can specify the
> combinations in one go.
>
>
>
> For Example, if you have 2 attributes, you can roll up (ATT1),
> (ATT2,ATT2), (ATT2) by specifying the groups using grouping sets.
>
>
>
> Best
>
> Ayan
>
>
>
> On Mon, Jan 9, 2017 at 2:29 PM, Anil Langote 
> wrote:
>
> Hi Ayan
>
>
>
> Thanks a lot for reply, what is GROUPING SET? I did try GROUP BY with UDAF
> but it doesn’t perform well. for one combination it takes 1.5 mins in my
> use case I have 400 combinations which will take ~400 mins I am looking for
> a solution which will scale on the combinations.
>
>
>
> Thank you
>
> Anil Langote
>
> +1-425-633-9747 <+1%20425-633-9747>
>
>
>
>
>
> *From: *ayan guha 
> *Date: *Sunday, January 8, 2017 at 10:26 PM
> *To: *Anil Langote 
> *Cc: *Holden Karau , user 
> *Subject: *Re: Efficient look up in Key Pair RDD
>
>
>
> Have you tried something like GROUPING SET? That seems to be the exact
> thing you are looking for
>
>
>
> On Mon, Jan 9, 2017 at 12:37 PM, Anil Langote 
> wrote:
>
> Sure. Let me explain you my requirement I have an input file which has
> attributes (25) and las column is array of doubles (14500 elements in
> original file)
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> Now I have to compute the addition of the double for any given combination
> for example in above file we will have below possible combinations
>
>
>
> 1.  Attribute_0, Attribute_1
>
> 2.  Attribute_0, Attribute_2
>
> 3.  Attribute_0, Attribute_3
>
> 4.  Attribute_1, Attribute_2
>
> 5.  Attribute_2, Attribute_3
>
> 6.  Attribute_1, Attribute_3
>
> 7.  Attribute_0, Attribute_1, Attribute_2
>
> 8.  Attribute_0, Attribute_1, Attribute_3
>
> 9.  Attribute_0, Attribute_2, Attribute_3
>
> 10.  Attribute_1, Attribute_2, Attribute_3
>
> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>
>
>
> now if we process the *Attribute_0, Attribute_1* combination we want
> below output. In similar way we have to process all the above combinations
>
>
>
> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319,
> 0.7698036740044117]
>
> 3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518,
> 0.6913180478781878]
>
>
>
> Solution tried
>
>
>
> I have created parequet file which will have the schema and last column
> will be array of doubles. The size of the parquet file I have is 276G which
> has 2.65 M records.
>
>
>
> I have implemented the UDAF which will have
>
>
>
> Input schema : array of doubles
>
> Buffer schema : array of doubles
>
> Return schema : array of doubles
>
>
>
> I load the data from parquet file and then register the UDAF to use with
> below query, note that SUM is UDAF
>
>
>
> SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), *Attribute_0, Attribute_1
> FROM RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1*
>
>
>
> This works fine and it takes 1.2 mins for one combination my use case will
> have 400 combinations which means 8 hours which is not meeting the SLA we
> want this to be below 1 hours. What is the best way to implement this use
> case.
>
>
>
> Best Regards,
>
> Anil Langote
>
> +1-425-633-9747
>
>
> On Jan 8, 2017, at 8:17 PM, Holden Karau  wrote:
>
> To start with caching and having a known partioner will help a bit, then
> there is also the IndexedRDD project, but in general spark might not be the
> best tool for the job.  Have you considered having Spark output to
> something like memcache?
>
>
>
> What's the goal of you are trying to accomplish?
>
>
>
> On Sun, Jan 8, 2017 at 5:04 PM Anil Langote 
> wrote:
>
> Hi All,
>
>
>
> I have a requirement where I wanted to build a distributed HashMap which
> holds 10M key value pairs and provides very efficient loo

Re: A note about MLlib's StandardScaler

2017-01-08 Thread Holden Karau
Hi Gilad,

Spark uses the sample standard variance inside of the StandardScaler (see
https://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.mllib.feature.StandardScaler
) which I think would explain the results you are seeing you are seeing. I
believe the scalers are intended to be used on larger sized datasets You
can verify this yourself doing the same computation in Python and see the
scaling using the sample deviation result in the values you are seeing from
Spark.

Cheers,

Holden :)


On Sun, Jan 8, 2017 at 12:06 PM, Gilad Barkan 
wrote:

> Hi
>
> It seems that the output of MLlib's *StandardScaler*(*withMean=*True,
> *withStd*=True)are not as expected.
>
> The above configuration is expected to do the following transformation:
>
> X -> Y = (X-Mean)/Std  - Eq.1
>
> This transformation (a.k.a. Standardization) should result in a
> "standardized" vector with unit-variance and zero-mean.
>
> I'll demonstrate my claim using the current documentation example:
>
> >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]>>> 
> >>> dataset = sc.parallelize(vs)>>> standardizer = StandardScaler(True, 
> >>> True)>>> model = standardizer.fit(dataset)>>> result = 
> >>> model.transform(dataset)>>> for r in result.collect(): print r
> DenseVector([-0.7071, 0.7071, -0.7071])DenseVector([0.7071, -0.7071, 
> 0.7071])
>
> This result in std = sqrt(1/2) foreach column instead of std=1.
>
> Applying Standardization transformation on the above 2 vectors result in the 
> following output
>
> DenseVector([-1.0, 1.0, -1.0])DenseVector([1.0, -1.0, 1.0])
>
>
> Another example:
>
> Adding another DenseVector([2.4, 0.8, 3.5]) to the above we get a 3 rows of 
> DenseVectors:
> [DenseVector([-2.0, 2.3, 0.0]), DenseVector([3.8, 0.0, 1.9]), 
> DenseVector([2.4, 0.8, 3.5])]
>
> The StandardScaler result the following scaled vectors:
> [DenseVector([-1.12339, 1.084829, -1.02731]), DenseVector([0.792982, 
> -0.88499, 0.057073]), DenseVector([0.330409, 4
> -0.19984, 0.970241])
>
> This result has std=sqrt(2/3)
>
> Instead it should have resulted other 3 vectors that form std=1 for each 
> column.
>
> Adding another vector (4 total) results in 4 scaled vectors that form std= 
> sqrt(3/4) instead of std=1
>
> I hope all the examples help to make my point clear.
>
> I hope I don't miss here something.
>
> Thank you
>
> Gilad Barkan
>
>
>
>
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


How to connect Tableau to databricks spark?

2017-01-08 Thread Raymond Xie
I want to do some data analytics work by leveraging Databricks spark
platform and connect my Tableau desktop to it for data visualization.

Does anyone ever make it? I've trying to follow the instruction below but
not successful?

https://docs.cloud.databricks.com/docs/latest/databricks_guide/01%20Databricks%20Overview/14%20Third%20Party%20Integrations/01%20Setup%20JDBC%20or%20ODBC.html


I got an error message in Tableau's attempt to connect:

Unable to connect to the server "
ec2-35-160-128-113.us-west-2.compute.amazonaws.com". Check that the server
is running and that you have access privileges to the requested database.

"ec2-35-160-128-113.us-west-2.compute.amazonaws.com" is the hostname of a
EC2 instance I just created on AWS, I may have some missing there though as
I am new to AWS.

I am not sure that is related to account issue, I was using my Databricks
account in Tableau to connect it.

Thank you very much. Any clue is appreciated.

**
*Sincerely yours,*


*Raymond*


Re: Efficient look up in Key Pair RDD

2017-01-08 Thread Anil Langote
Hi Ayan

 

Thanks a lot for reply, what is GROUPING SET? I did try GROUP BY with UDAF but 
it doesn’t perform well. for one combination it takes 1.5 mins in my use case I 
have 400 combinations which will take ~400 mins I am looking for a solution 
which will scale on the combinations.

 

Thank you

Anil Langote

+1-425-633-9747

 

 

From: ayan guha 
Date: Sunday, January 8, 2017 at 10:26 PM
To: Anil Langote 
Cc: Holden Karau , user 
Subject: Re: Efficient look up in Key Pair RDD

 

Have you tried something like GROUPING SET? That seems to be the exact thing 
you are looking for

 

On Mon, Jan 9, 2017 at 12:37 PM, Anil Langote  wrote:

Sure. Let me explain you my requirement I have an input file which has 
attributes (25) and las column is array of doubles (14500 elements in original 
file)

 

Attribute_0Attribute_1Attribute_2Attribute_3DoubleArray
53530.2938933463658645  0.0437040427073041  0.23002681025029648  
0.18003221216680454
32130.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
53520.8803063581705307  0.8101324740101096  0.48523937757683544  
0.5897714618376072
32130.33960064683141955  0.46537001358164043  0.543428826489435  
0.42653939565053034
22050.5108235777360906  0.4368119043922922  0.8651556676944931  
0.7451477943975504
 

Now I have to compute the addition of the double for any given combination for 
example in above file we will have below possible combinations

 

1.  Attribute_0, Attribute_1

2.  Attribute_0, Attribute_2

3.  Attribute_0, Attribute_3

4.  Attribute_1, Attribute_2

5.  Attribute_2, Attribute_3

6.  Attribute_1, Attribute_3

7.  Attribute_0, Attribute_1, Attribute_2

8.  Attribute_0, Attribute_1, Attribute_3

9.  Attribute_0, Attribute_2, Attribute_3

10.  Attribute_1, Attribute_2, Attribute_3

11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4

 

now if we process the Attribute_0, Attribute_1 combination we want below 
output. In similar way we have to process all the above combinations

 

5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
0.7698036740044117]

3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518, 
0.6913180478781878]

 

Solution tried

 

I have created parequet file which will have the schema and last column will be 
array of doubles. The size of the parquet file I have is 276G which has 2.65 M 
records.

 

I have implemented the UDAF which will have 

 

Input schema : array of doubles

Buffer schema : array of doubles 

Return schema : array of doubles

 

I load the data from parquet file and then register the UDAF to use with below 
query, note that SUM is UDAF

 

SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), Attribute_0, Attribute_1 FROM 
RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1

 

This works fine and it takes 1.2 mins for one combination my use case will have 
400 combinations which means 8 hours which is not meeting the SLA we want this 
to be below 1 hours. What is the best way to implement this use case.

 

Best Regards,

Anil Langote

+1-425-633-9747


On Jan 8, 2017, at 8:17 PM, Holden Karau  wrote:

To start with caching and having a known partioner will help a bit, then there 
is also the IndexedRDD project, but in general spark might not be the best tool 
for the job.  Have you considered having Spark output to something like 
memcache?

 

What's the goal of you are trying to accomplish?

 

On Sun, Jan 8, 2017 at 5:04 PM Anil Langote  wrote:

Hi All,

 

I have a requirement where I wanted to build a distributed HashMap which holds 
10M key value pairs and provides very efficient lookups for each key. I tried 
loading the file into JavaPairedRDD and tried calling lookup method its very 
slow.

 

How can I achieve very very faster lookup by a given key?

 

Thank you

Anil Langote 



 

-- 

Best Regards,
Ayan Guha



Re: Efficient look up in Key Pair RDD

2017-01-08 Thread ayan guha
Have you tried something like GROUPING SET? That seems to be the exact
thing you are looking for

On Mon, Jan 9, 2017 at 12:37 PM, Anil Langote 
wrote:

> Sure. Let me explain you my requirement I have an input file which has
> attributes (25) and las column is array of doubles (14500 elements in
> original file)
>
>
>
> Attribute_0
>
> Attribute_1
>
> Attribute_2
>
> Attribute_3
>
> DoubleArray
>
> 5
>
> 3
>
> 5
>
> 3
>
> 0.2938933463658645  0.0437040427073041  0.23002681025029648
> 0.18003221216680454
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.5353599620508771  0.026777650111232787  0.31473082754161674
> 0.2647786522276575
>
> 5
>
> 3
>
> 5
>
> 2
>
> 0.8803063581705307  0.8101324740101096  0.48523937757683544
> 0.5897714618376072
>
> 3
>
> 2
>
> 1
>
> 3
>
> 0.33960064683141955  0.46537001358164043  0.543428826489435
> 0.42653939565053034
>
> 2
>
> 2
>
> 0
>
> 5
>
> 0.5108235777360906  0.4368119043922922  0.8651556676944931
> 0.7451477943975504
>
>
>
> Now I have to compute the addition of the double for any given combination
> for example in above file we will have below possible combinations
>
>
>
> 1.  Attribute_0, Attribute_1
>
> 2.  Attribute_0, Attribute_2
>
> 3.  Attribute_0, Attribute_3
>
> 4.  Attribute_1, Attribute_2
>
> 5.  Attribute_2, Attribute_3
>
> 6.  Attribute_1, Attribute_3
>
> 7.  Attribute_0, Attribute_1, Attribute_2
>
> 8.  Attribute_0, Attribute_1, Attribute_3
>
> 9.  Attribute_0, Attribute_2, Attribute_3
>
> 10.  Attribute_1, Attribute_2, Attribute_3
>
> 11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
>
>
>
> now if we process the *Attribute_0, Attribute_1* combination we want
> below output. In similar way we have to process all the above combinations
>
>
>
> 5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319,
> 0.7698036740044117]
>
> 3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518,
> 0.6913180478781878]
>
>
>
> Solution tried
>
>
>
> I have created parequet file which will have the schema and last column
> will be array of doubles. The size of the parquet file I have is 276G which
> has 2.65 M records.
>
>
>
> I have implemented the UDAF which will have
>
>
>
> Input schema : array of doubles
>
> Buffer schema : array of doubles
>
> Return schema : array of doubles
>
>
>
> I load the data from parquet file and then register the UDAF to use with
> below query, note that SUM is UDAF
>
>
>
> SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), *Attribute_0, Attribute_1
> FROM RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1*
>
>
>
> This works fine and it takes 1.2 mins for one combination my use case will
> have 400 combinations which means 8 hours which is not meeting the SLA we
> want this to be below 1 hours. What is the best way to implement this use
> case.
>
> Best Regards,
>
> Anil Langote
>
> +1-425-633-9747
>
> On Jan 8, 2017, at 8:17 PM, Holden Karau  wrote:
>
> To start with caching and having a known partioner will help a bit, then
> there is also the IndexedRDD project, but in general spark might not be the
> best tool for the job.  Have you considered having Spark output to
> something like memcache?
>
> What's the goal of you are trying to accomplish?
>
> On Sun, Jan 8, 2017 at 5:04 PM Anil Langote 
> wrote:
>
>> Hi All,
>>
>> I have a requirement where I wanted to build a distributed HashMap which
>> holds 10M key value pairs and provides very efficient lookups for each key.
>> I tried loading the file into JavaPairedRDD and tried calling lookup method
>> its very slow.
>>
>> How can I achieve very very faster lookup by a given key?
>>
>> Thank you
>> Anil Langote
>>
>


-- 
Best Regards,
Ayan Guha


subsription

2017-01-08 Thread Raymond Xie
**
*Sincerely yours,*


*Raymond*


Re: Efficient look up in Key Pair RDD

2017-01-08 Thread Anil Langote
Sure. Let me explain you my requirement I have an input file which has 
attributes (25) and las column is array of doubles (14500 elements in original 
file)
 
Attribute_0
Attribute_1
Attribute_2
Attribute_3
DoubleArray
5
3
5
3
0.2938933463658645  0.0437040427073041  0.23002681025029648  0.18003221216680454
3
2
1
3
0.5353599620508771  0.026777650111232787  0.31473082754161674  
0.2647786522276575
5
3
5
2
0.8803063581705307  0.8101324740101096  0.48523937757683544  0.5897714618376072
3
2
1
3
0.33960064683141955  0.46537001358164043  0.543428826489435  0.42653939565053034
2
2
0
5
0.5108235777360906  0.4368119043922922  0.8651556676944931  0.7451477943975504
 
Now I have to compute the addition of the double for any given combination for 
example in above file we will have below possible combinations
 
1.  Attribute_0, Attribute_1
2.  Attribute_0, Attribute_2
3.  Attribute_0, Attribute_3
4.  Attribute_1, Attribute_2
5.  Attribute_2, Attribute_3
6.  Attribute_1, Attribute_3
7.  Attribute_0, Attribute_1, Attribute_2
8.  Attribute_0, Attribute_1, Attribute_3
9.  Attribute_0, Attribute_2, Attribute_3
10.  Attribute_1, Attribute_2, Attribute_3
11.  Attribute_1, Attribute_2, Attribute_3, Attribute_4
 
now if we process the Attribute_0, Attribute_1 combination we want below 
output. In similar way we have to process all the above combinations
 
5_3 ==>  [1.1741997045363952, 0.8538365167174137, 0.7152661878271319, 
0.7698036740044117]
3_2 ==> [0.8749606088822967, 0.4921476636928732, 0.8581596540310518, 
0.6913180478781878]
 
Solution tried
 
I have created parequet file which will have the schema and last column will be 
array of doubles. The size of the parquet file I have is 276G which has 2.65 M 
records.
 
I have implemented the UDAF which will have 
 
Input schema : array of doubles
Buffer schema : array of doubles 
Return schema : array of doubles
 
I load the data from parquet file and then register the UDAF to use with below 
query, note that SUM is UDAF
 
SELECT COUNT(*) AS MATCHES, SUM(DOUBLEARRAY), Attribute_0, Attribute_1 FROM 
RAW_TABLE GROUP BY Attribute_0, Attribute_1 HAVING COUNT(*)>1
 
This works fine and it takes 1.2 mins for one combination my use case will have 
400 combinations which means 8 hours which is not meeting the SLA we want this 
to be below 1 hours. What is the best way to implement this use case.

Best Regards,
Anil Langote
+1-425-633-9747

> On Jan 8, 2017, at 8:17 PM, Holden Karau  wrote:
> 
> To start with caching and having a known partioner will help a bit, then 
> there is also the IndexedRDD project, but in general spark might not be the 
> best tool for the job.  Have you considered having Spark output to something 
> like memcache?
> 
> What's the goal of you are trying to accomplish?
> 
>> On Sun, Jan 8, 2017 at 5:04 PM Anil Langote  
>> wrote:
>> Hi All,
>> 
>> I have a requirement where I wanted to build a distributed HashMap which 
>> holds 10M key value pairs and provides very efficient lookups for each key. 
>> I tried loading the file into JavaPairedRDD and tried calling lookup method 
>> its very slow.
>> 
>> How can I achieve very very faster lookup by a given key?
>> 
>> Thank you
>> Anil Langote 


Re: Efficient look up in Key Pair RDD

2017-01-08 Thread Holden Karau
To start with caching and having a known partioner will help a bit, then
there is also the IndexedRDD project, but in general spark might not be the
best tool for the job.  Have you considered having Spark output to
something like memcache?

What's the goal of you are trying to accomplish?

On Sun, Jan 8, 2017 at 5:04 PM Anil Langote 
wrote:

> Hi All,
>
> I have a requirement where I wanted to build a distributed HashMap which
> holds 10M key value pairs and provides very efficient lookups for each key.
> I tried loading the file into JavaPairedRDD and tried calling lookup method
> its very slow.
>
> How can I achieve very very faster lookup by a given key?
>
> Thank you
> Anil Langote
>


Efficient look up in Key Pair RDD

2017-01-08 Thread Anil Langote
Hi All,

I have a requirement where I wanted to build a distributed HashMap which
holds 10M key value pairs and provides very efficient lookups for each key.
I tried loading the file into JavaPairedRDD and tried calling lookup method
its very slow.

How can I achieve very very faster lookup by a given key?

Thank you
Anil Langote


Re:Docker image for Spark streaming app

2017-01-08 Thread vvshvv
Hi,

I am running spark streaming job using spark jobserver via this image:

https://hub.docker.com/r/depend/spark-jobserver/.

It works well in standalone (using mesos job does not make progress). Spark jobserver that supports Spark 2.0 has new API that is only suitable for non-streaming jobs as for streaming you have to specify StreamingContextFactory that still uses old API, so you can just use old API for now.



Sent from my Mi phoneOn shyla deshpande , Jan 8, 2017 11:51 PM wrote:I looking for a docker image that I can use from docker hub for running a spark streaming app with scala and spark 2.0 +.I am new to docker and unable to find one image from docker hub that suits my needs. Please let me know if anyone is using a docker for spark streaming app and share your experience.Thanks


Docker image for Spark streaming app

2017-01-08 Thread shyla deshpande
I looking for a docker image that I can use from docker hub for running a
spark streaming app with scala and spark 2.0 +.

I am new to docker and unable to find one image from docker hub that suits
my needs. Please let me know if anyone is using a docker for spark
streaming app and share your experience.

Thanks


[META] What to do about "This post has NOT been accepted by the mailing list yet." ?

2017-01-08 Thread dpapathanasiou
I both subscribed to the mailing list and registered with Nabble, and I was
able to post my question[1] there yesterday.

While it has gotten a few views, there is no answer yet, and I also saw this
warning:

"This post has NOT been accepted by the mailing list yet."

How do I resolve that, so the people on the mailing list will see the
question?

[1]
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-an-anonymous-function-with-DataFrame-explode-td28285.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/META-What-to-do-about-This-post-has-NOT-been-accepted-by-the-mailing-list-yet-tp28288.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Storage history in web UI

2017-01-08 Thread Appu K
@jacek - thanks a lot for the book

@joe - looks like the rest api also exposes a few things like
/applications/[app-id]/storage/rdd
/applications/[app-id]/storage/rdd/[rdd-id]
that might perhaps be of interest to you ?
http://spark.apache.org/docs/latest/monitoring.html





On 9 January 2017 at 12:07:34 AM, Jacek Laskowski (ja...@japila.pl) wrote:

Hi,

A possible workaround...Use SparkListener and save the results to a custom
sink.

After all web UI is a mere bag of SparkListeners + excellent
visualizations.

Jacek

On 3 Jan 2017 4:14 p.m., "Joseph Naegele" 
wrote:

Hi all,

Is there any way to observe Storage history in Spark, i.e. which RDDs were
cached and where, etc. after an application completes? It appears the
Storage tab in the History Server UI is useless.

Thanks
---
Joe Naegele
Grier Forensics



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Storage history in web UI

2017-01-08 Thread Jacek Laskowski
Hi,

A possible workaround...Use SparkListener and save the results to a custom
sink.

After all web UI is a mere bag of SparkListeners + excellent
visualizations.

Jacek

On 3 Jan 2017 4:14 p.m., "Joseph Naegele" 
wrote:

Hi all,

Is there any way to observe Storage history in Spark, i.e. which RDDs were
cached and where, etc. after an application completes? It appears the
Storage tab in the History Server UI is useless.

Thanks
---
Joe Naegele
Grier Forensics



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark UI - Puzzling “Input Size / Records” in Stage Details

2017-01-08 Thread Appu K
Was trying something basic to understand tasks stages and shuffles a bit
better in Spark. The dataset is 256 MB

Tried this in zeppelin

val tmpDF = spark.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-tsv")
tmpDF.count

This kicked off 4 jobs -

   - 3 jobs for the first statement and
   - 1 job with 2 stages for tmpDF.count

The last stage of the job that corresponded to the count statement has some
puzzling data that i'm unable to explain.

   1.

   Stage details section says "Input Size / Records: 186.6 MB / 720 "
   and Aggregated Metrics by Executor says "Input Size / Records " to be
   "186.6 MB / 5371292" - Stage Details UI
   
   2.

   In the tasks list, one particular server
   ip-x-x-x-60.eu-west-1.compute.internal has 4 tasks with "0.0 B / 457130" as
   the value for "Input Size / Records " - Task Details UI
   

I initially thought this is some local disk cache or something that has to
do with EMRFS. But however, once I cached the dataframe and took the count
again, it showed up "16.8 MB / 46" for all 16 tasks corresponding to the 16
partitions.

Any links/pointers to understand this a bit better would be highly helpful


Unable to explain the job kicked off for spark.read.csv

2017-01-08 Thread Appu K
I was trying to create a base-data-frame in an EMR cluster from a csv file
using

val baseDF =
spark.read.csv("s3://l4b-d4t4/wikipedia/pageviews-by-second-tsv”)

Omitted the options to infer the schema and specify the header, just to
understand what happens behind the screen.


The Spark UI shows that this kicked off a job with one stage.The stage
shows that a filter was applied

Got curious a little bit about this. Is there any place where i could
better understand why a filter was applied here and why there was an action
in this case


thanks


Re: Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-08 Thread Sean Owen
Yes, the class Kryo seems to register all of these primitive array classes
already. That's been true for a long time.

However in KryoSerializer, I see the following. I wonder why double, int
and other primitive array types are missing? I can't think of a good
reason. Imran do you know? i think you added some of these a long time ago.

If Kryo registers these then I don't know why this was ever necessary, but
if it's necessary for some reason I'm missing, then it may be important to
add all primitive types here.


private[serializer] object KryoSerializer {
  // Commonly used classes.
  private val toRegister: Seq[Class[_]] = Seq(
ByteBuffer.allocate(1).getClass,
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
classOf[Array[Short]],
classOf[Array[Long]],
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
  )


On Sun, Jan 8, 2017 at 1:29 PM 颜发才(Yan Facai)  wrote:

> Hi, Owen,
> it is fixed after registering manually:
> conf.registerKryoClasses(Array(Class.forName("[D")))
>
>
> I believe that Kyro (latest version) have supported double[] :
> addDefaultSerializer(double[].class, DoubleArraySerializer.class);
> 
>
> Why does it break in spark?
>
>
>
> On Sun, Jan 8, 2017 at 6:03 PM, Sean Owen  wrote:
>
> Double[] is not of the same class as double[]. Kryo should already know
> how to serialize double[], but I doubt Double[] is registered.
>
> The error does seem to clearly indicate double[] though. That surprises
> me.  Can you try manually registering it to see if that fixes it?
> But then I'm not sure why tests wouldn't catch this.
>
> On Sun, Jan 8, 2017 at 7:30 AM smartzjp  wrote:
>
> You can have a try the following code.
>
> ObjectArraySerializer serializer = new ObjectArraySerializer(kryo, Double
> [].class);
> kryo.register(Double[].class, serializer);
>
>
> ---
>
> Hi, all.
> I enable kyro in spark with spark-defaults.conf:
>  spark.serializer
> org.apache.spark.serializer.KryoSerializer
>  spark.kryo.registrationRequired  true
>
> A KryoException is raised when a logistic regression algorithm is running:
>  Note: To register this class use: kryo.register(double[].class);
>  Serialization trace:
>  currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> My question is:
> Doesn't double[].class be supported by default?
>
> Thanks.
>
>
>


Re: Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-08 Thread Yan Facai
Hi, Owen,
it is fixed after registering manually:
conf.registerKryoClasses(Array(Class.forName("[D")))


I believe that Kyro (latest version) have supported double[] :
addDefaultSerializer(double[].class, DoubleArraySerializer.class);


Why does it break in spark?



On Sun, Jan 8, 2017 at 6:03 PM, Sean Owen  wrote:

> Double[] is not of the same class as double[]. Kryo should already know
> how to serialize double[], but I doubt Double[] is registered.
>
> The error does seem to clearly indicate double[] though. That surprises
> me.  Can you try manually registering it to see if that fixes it?
> But then I'm not sure why tests wouldn't catch this.
>
> On Sun, Jan 8, 2017 at 7:30 AM smartzjp  wrote:
>
>> You can have a try the following code.
>>
>> ObjectArraySerializer serializer = new ObjectArraySerializer(kryo, Double
>> [].class);
>> kryo.register(Double[].class, serializer);
>>
>>
>> ---
>>
>> Hi, all.
>> I enable kyro in spark with spark-defaults.conf:
>>  spark.serializer org.apache.spark.serializer.
>> KryoSerializer
>>  spark.kryo.registrationRequired  true
>>
>> A KryoException is raised when a logistic regression algorithm is running:
>>  Note: To register this class use: kryo.register(double[].class);
>>  Serialization trace:
>>  currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
>> at com.esotericsoftware.kryo.serializers.FieldSerializer$
>> ObjectField.write(FieldSerializer.java:585)
>> at com.esotericsoftware.kryo.serializers.FieldSerializer.
>> write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>> java:568)
>> at com.twitter.chill.Tuple2Serializer.write(
>> TupleSerializers.scala:36)
>> at com.twitter.chill.Tuple2Serializer.write(
>> TupleSerializers.scala:33)
>>at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>> java:568)
>>
>> My question is:
>> Doesn't double[].class be supported by default?
>>
>> Thanks.
>>
>>


Re: Spark 2.0.2, KyroSerializer, double[] is not registered.

2017-01-08 Thread Sean Owen
Double[] is not of the same class as double[]. Kryo should already know how
to serialize double[], but I doubt Double[] is registered.

The error does seem to clearly indicate double[] though. That surprises
me.  Can you try manually registering it to see if that fixes it?
But then I'm not sure why tests wouldn't catch this.

On Sun, Jan 8, 2017 at 7:30 AM smartzjp  wrote:

> You can have a try the following code.
>
> ObjectArraySerializer serializer = new ObjectArraySerializer(kryo, Double
> [].class);
> kryo.register(Double[].class, serializer);
>
>
> ---
>
> Hi, all.
> I enable kyro in spark with spark-defaults.conf:
>  spark.serializer
> org.apache.spark.serializer.KryoSerializer
>  spark.kryo.registrationRequired  true
>
> A KryoException is raised when a logistic regression algorithm is running:
>  Note: To register this class use: kryo.register(double[].class);
>  Serialization trace:
>  currL1 (org.apache.spark.mllib.stat.MultivariateOnlineSummarizer)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>
> My question is:
> Doesn't double[].class be supported by default?
>
> Thanks.
>
>