RE: Do we really need mesos or yarn? or is standalone sufficent?

2016-12-16 Thread Saif.A.Ellafi
In my experience, Standalone works very well in small cluster where there isn’t 
anything else running.

Bigger cluster or shared resources, YARN takes a win, surpassing the overhead 
of spawning containers as opposed to a background running worker.

Best is if you try both, if standalone is good enough keep it till you need 
more. Otherwise, try YARN or MESOS depending on the rest of your components.

2cents

Saif

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Friday, December 16, 2016 3:14 AM
To: user @spark
Subject: Do we really need mesos or yarn? or is standalone sufficent?

Do we really need mesos or yarn? or is standalone sufficient for production 
systems? I understand the difference but I don't know the capabilities of 
standalone cluster. does anyone have experience deploying standalone in the 
production?




Cluster deploy mode driver location

2016-11-21 Thread Saif.A.Ellafi
Hello there,

I have a Spark program in 1.6.1, however, when I submit it to cluster, it 
randomly picks the driver.

I know there is a driver specification option, but along with it it is 
mandatory to define many other options I am not familiar with. The trouble is, 
the .jars I am launching need to be available at the driver host, and I would 
like to have this jars in just a specific host, which I like it to be the 
driver.

Any help?

Thanks!
Saif



RE: Applying a limit after orderBy of big dataframe hangs spark

2016-08-05 Thread Saif.A.Ellafi
Hi thanks for the assistance,


1.   Standalone

2.   df.orderBy(field).limit(5000).write.parquet(...)

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Friday, August 05, 2016 4:33 PM
To: Ellafi, Saif A.
Cc: user @spark
Subject: Re: Applying a limit after orderBy of big dataframe hangs spark

Hi,

  1.  What scheduling are you using standalone, yarn etc?
  2.  How arte you limiting the df output?

HTH




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 5 August 2016 at 19:54, 
> wrote:
Hi all,

I am working with a 1.5 billon rows dataframe in a small cluster and trying to 
apply an orderBy operation by one of the Long Types columns.

If I limit such output to some number, say 5 millon, then trying to count, 
persist or store the dataframe makes spark crash with losing executors and hang 
ups.
Not limiting the dataframe after the order by operation works normally, i.e. it 
works fine when trying to write the 1.5 billon rows again.

Any thoughts? Using spark 1.6.0 scala 2.11

Saif




Applying a limit after orderBy of big dataframe hangs spark

2016-08-05 Thread Saif.A.Ellafi
Hi all,

I am working with a 1.5 billon rows dataframe in a small cluster and trying to 
apply an orderBy operation by one of the Long Types columns.

If I limit such output to some number, say 5 millon, then trying to count, 
persist or store the dataframe makes spark crash with losing executors and hang 
ups.
Not limiting the dataframe after the order by operation works normally, i.e. it 
works fine when trying to write the 1.5 billon rows again.

Any thoughts? Using spark 1.6.0 scala 2.11

Saif



multiple SPARK_LOCAL_DIRS causing strange behavior in parallelism

2016-07-29 Thread Saif.A.Ellafi
Hi all,

I was currently playing around with spark-env around SPARK_LOCAL_DIRS in order 
to add additional shuffle storage.

But since I did this, I am getting too many open files error if total executor 
cores is high. I am also getting low parallelism, by monitoring the running 
tasks on some big job, most tasks run on the driver host, and very limited in 
other nodes, while using ANY locality.

Generally speaking, Could I be doing anything wrong regarding this setting?

I am setting on each node, local different phyisical hard drives to store 
shuffle information. Returning this configuration to a single folder storage on 
each node, everything runs normally

Thanks,
Saif



RE: Strategies for propery load-balanced partitioning

2016-06-03 Thread Saif.A.Ellafi
Appreciate the follow up.

I am not entirely sure how or why my question is related to bucketization 
capabilities. It indeeds sounds like a powerful feature to avoid shuffling, but 
in my case, I am referring to straight forward processes of reading data and 
writing to parquet.
If bucket tables allow to setup on pre-reading time buckets and specify 
parallelization when directly writing, then you hit on the nail.

My problem is that reading from source (usually hundreds of text files) turn in 
into 10k+ partition dataframes, based on the partition's block size and  number 
of data splits, writing these back are a huge overhead for parquet and require 
repartitioning in order to reduce heap memory usage, specially on wide tables.

Let see how it goes.
Saif


From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, June 03, 2016 2:55 PM
To: Ellafi, Saif A.
Cc: user; Reynold Xin; mich...@databricks.com
Subject: Re: Strategies for propery load-balanced partitioning

I suppose you are running on 1.6.
I guess you need some solution based on [1], [2] features which are coming in 
2.0.

[1] https://issues.apache.org/jira/browse/SPARK-12538 / 
https://issues.apache.org/jira/browse/SPARK-12394
[2] https://issues.apache.org/jira/browse/SPARK-12849

However, I did not check for examples, I would like to add to your question and 
ask the community to link to some examples with the recent improvements/changes.

It could help however to give concrete example on your specific problem, as you 
may hit some stragglers also probably caused by data skew.

Best,
Ovidiu


On 03 Jun 2016, at 17:31, 
saif.a.ell...@wellsfargo.com wrote:

Hello everyone!

I was noticing that, when reading parquet files or actually any kind of source 
data frame data (spark-csv, etc), default partinioning is not fair.
Action tasks usually act very fast on some partitions and very slow on some 
others, and frequently, even fast on all but last partition (which looks like 
it reads +50% of the data input size).

I notice that each task is loading some portion of the data, say 1024MB chunks, 
and some task loading 20+GB of data.

Applying repartition strategies solve this issue properly and general 
performance is increased considerably, but for very large dataframes, 
repartitioning is a costly process.

In short, what are the available strategies or configurations that help reading 
from disk or hdfs with proper executor-data-distribution??

If this needs to be more specific, I am strictly focused on PARQUET files rom 
HDFS. I know there are some MIN

Really appreciate,
Saif



Strategies for propery load-balanced partitioning

2016-06-03 Thread Saif.A.Ellafi
Hello everyone!

I was noticing that, when reading parquet files or actually any kind of source 
data frame data (spark-csv, etc), default partinioning is not fair.
Action tasks usually act very fast on some partitions and very slow on some 
others, and frequently, even fast on all but last partition (which looks like 
it reads +50% of the data input size).

I notice that each task is loading some portion of the data, say 1024MB chunks, 
and some task loading 20+GB of data.

Applying repartition strategies solve this issue properly and general 
performance is increased considerably, but for very large dataframes, 
repartitioning is a costly process.

In short, what are the available strategies or configurations that help reading 
from disk or hdfs with proper executor-data-distribution??

If this needs to be more specific, I am strictly focused on PARQUET files rom 
HDFS. I know there are some MIN

Really appreciate,
Saif



spark-submit not adding application jar to classpath

2016-04-18 Thread Saif.A.Ellafi
Hi,

I am submitting a jar file to spark submit, which has some content inside 
src/main/resources.
I am unable to access such resources, since the application jar is not being 
added to the classpath.

This works fine if I include the application jar also in the -driver-class-path 
entry.

Is this healthy? any thoughts?
Saif



RE: Strange bug: Filter problem with parenthesis

2016-04-14 Thread Saif.A.Ellafi
Appreciated Michael, but this doesn’t help my case, the filter string is being 
submitted from outside my program, is there any other alternative? some literal 
string parser or anything I can do before?

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Wednesday, April 13, 2016 6:29 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Strange bug: Filter problem with parenthesis

You need to use `backticks` to reference columns that have non-standard 
characters.

On Wed, Apr 13, 2016 at 6:56 AM, 
> wrote:
Hi,

I am debugging a program, and for some reason, a line calling the following is 
failing:

df.filter("sum(OpenAccounts) > 5").show

It says it cannot find the column OpenAccounts, as if it was applying the sum() 
function and looking for a column called like that, where there is not. This 
works fine if I rename the column to something without parenthesis.

I can’t reproduce this issue in Spark Shell (1.6.0), any ideas on how can I 
analyze this? This is an aggregation result, with the default column names 
afterwards.

PS: Workaround is to use toDF(cols) and rename all columns, but I am wondering 
if toDF has any impact on the RDD structure behind (e.g. repartitioning, cache, 
etc)

Appreciated,
Saif




Strange bug: Filter problem with parenthesis

2016-04-13 Thread Saif.A.Ellafi
Hi,

I am debugging a program, and for some reason, a line calling the following is 
failing:

df.filter("sum(OpenAccounts) > 5").show

It says it cannot find the column OpenAccounts, as if it was applying the sum() 
function and looking for a column called like that, where there is not. This 
works fine if I rename the column to something without parenthesis.

I can't reproduce this issue in Spark Shell (1.6.0), any ideas on how can I 
analyze this? This is an aggregation result, with the default column names 
afterwards.

PS: Workaround is to use toDF(cols) and rename all columns, but I am wondering 
if toDF has any impact on the RDD structure behind (e.g. repartitioning, cache, 
etc)

Appreciated,
Saif



Can we load csv partitioned data into one DF?

2016-02-22 Thread Saif.A.Ellafi
Hello all, I am facing a silly data question.

If I have +100 csv files which are part of the same data, but each csv is for 
example, a year on a timeframe column (i.e. partitioned by year),
what would you suggest instead of loading all those files and joining them?

Final target would be parquet. Is it possible, for example, to load them and 
then store them as parquet, and then read parquet and consider all as one?

Thanks for any suggestions,
Saif



jobs much slower in cluster mode vs local

2016-01-15 Thread Saif.A.Ellafi
Hello,

In general, I am usually able to run spark submit jobs in local mode, in a 
32-cores node with plenty of memory ram. The performance is significantly 
faster in local mode than when using a cluster of spark workers.

How can this be explained and what measures can one take in order to improve 
such performance?
Usually a job that takes 35 seconds in local mode takes around 48 seconds in a 
small cluster.

Thanks,
Saif



RE: jobs much slower in cluster mode vs local

2016-01-15 Thread Saif.A.Ellafi
Thank you, this looks useful indeed for what I have in mind.

Saif

From: Jiří Syrový [mailto:syrovy.j...@gmail.com]
Sent: Friday, January 15, 2016 12:06 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: jobs much slower in cluster mode vs local

Hi,

you can try to use spark job server and submit jobs to it. The thing is that 
the most expensive part is context creation.
J.

2016-01-15 15:28 GMT+01:00 
>:
Hello,

In general, I am usually able to run spark submit jobs in local mode, in a 
32-cores node with plenty of memory ram. The performance is significantly 
faster in local mode than when using a cluster of spark workers.

How can this be explained and what measures can one take in order to improve 
such performance?
Usually a job that takes 35 seconds in local mode takes around 48 seconds in a 
small cluster.

Thanks,
Saif




Big data job only finishes with Legacy memory management

2016-01-12 Thread Saif.A.Ellafi
Hello,

I am tinkering with Spark 1.6. I have this 1.5 Billion rows data, to which I 
apply several window functions such as lag, first, etc. The job is quite 
expensive, I am running a small cluster with executors running with 70GB of ram.

Using new memory management system, the job fails around the middle with heap 
memory limit exceeded problem. Tried also tinkering with different of the new 
memory settings with no success. 70GB * 4 nodes is a lot of resources for this 
kind of job.

Legacy mode memory management runs this job succesfully with default memory 
settings.

How could I further analyze this problem to provide assistance and better 
diagnostics??
All the job goes around the dataframe api, with nothing strange (no udf or 
custom operations).

Saif



RE: Is Spark 1.6 released?

2016-01-04 Thread Saif.A.Ellafi
Where can I read more about the dataset api on a user layer? I am failing to 
get an API doc or understand when to use DataFrame or DataSet, advantages, etc.

Thanks,
Saif

-Original Message-
From: Jean-Baptiste Onofré [mailto:j...@nanthrax.net] 
Sent: Monday, January 04, 2016 2:01 PM
To: user@spark.apache.org
Subject: Re: Is Spark 1.6 released?

It's now OK: Michael published and announced the release.

Sorry for the delay.

Regards
JB

On 01/04/2016 10:06 AM, Jung wrote:
> Hi
> There were Spark 1.6 jars in maven central and github.
> I found it 5 days ago. But it doesn't appear on Spark website now.
> May I regard Spark 1.6 zip file in github as a stable release?
>
> Thanks
> Jung
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Limit of application submission to cluster

2015-12-18 Thread Saif.A.Ellafi
Hello everyone,

I am testing some parallel program submission to a stand alone cluster. 
Everything works alright, the problem is, for some reason, I can't submit more 
than 3 programs to the cluster.
The fourth one, whether legacy or REST, simply hangs until one of the first 
three completes.
I am not sure how to debug this, I have tried to increase the number of 
connections per peer or akka number of threads with no luck, any ideas?

Thanks,
Saif



ML - LinearRegression: is this a bug ????

2015-12-02 Thread Saif.A.Ellafi
Data:

+---++
|   label|features|
+---++
|0.13271745268556925|[-0.2006809895664...|
|0.23956421080605234|[-0.0938342314459...|
|0.47464690691431843|[0.14124846466227...|
| 0.0941426858669834|[-0.2392557563850...|
|0.18127172833957172|[-0.1521267139124...|
| 0.4279981695794981|[0.09459972732745...|
|0.04648603521554342|[-0.2869124070364...|
| 0.4164836719056925|[0.08308522965365...|
|0.15519130823516833|[-0.1782071340168...|
|0.34583751349139175|[0.01243907123934...|
| 0.5732358988284585|[0.2398374565764162]|
|0.12352025893247957|[-0.2098781833195...|
|  0.672220700788423|[0.3388222585363807]|
|0.11796247818430779|[-0.2154359640677...|
|0.32647852580932724|[-0.0069199164427...|
|0.09211654339348248|[-0.2412818988585...|
| 0.4907542977669017|[0.15735585551485...|
| 0.3255888257160203|[-0.0078096165360...|
| 0.8542890157811815|[0.5208905735291393]|
| 0.1132558594215048|[-0.2201425828305...|
+---++
only showing top 20 rows

val model = lr.fit(data)
val predict_data = model.transform(data)

++---+---+
|features|   label|predicted_label|
++---+---+
|[-0.2006809895664...|0.13271745268556925|0.13271745268556925|
|[-0.0938342314459...|0.23956421080605234|0.23956421080605234|
|[0.14124846466227...|0.47464690691431843|0.47464690691431843|
|[-0.2392557563850...| 0.0941426858669834| 0.0941426858669834|
|[-0.1521267139124...|0.18127172833957172|0.18127172833957172|
|[0.09459972732745...| 0.4279981695794981| 0.4279981695794981|
|[-0.2869124070364...|0.04648603521554342| 0.0464860352155434|
|[0.08308522965365...| 0.4164836719056925| 0.4164836719056925|
|[-0.1782071340168...|0.15519130823516833|0.15519130823516833|
|[0.01243907123934...|0.34583751349139175|0.34583751349139175|
|[0.2398374565764162]| 0.5732358988284585| 0.5732358988284585|
|[-0.2098781833195...|0.12352025893247957|0.12352025893247959|
|[0.3388222585363807]|  0.672220700788423|  0.672220700788423|
|[-0.2154359640677...|0.11796247818430779|0.11796247818430777|
|[-0.0069199164427...|0.32647852580932724|0.32647852580932724|
|[-0.2412818988585...|0.09211654339348248|0.09211654339348246|
|[0.15735585551485...| 0.4907542977669017| 0.4907542977669017|
|[-0.0078096165360...| 0.3255888257160203| 0.3255888257160203|
|[0.5208905735291393]| 0.8542890157811815| 0.8542890157811815|
|[-0.2201425828305...| 0.1132558594215048|0.11325585942150479|
++---+---+
only showing top 20 rows

model.weights
res49: org.apache.spark.mllib.linalg.Vector = [1.0]

if instead, I remove the intercept:

val zz = lrr.setFitIntercept(false).fit(vnt_data)
zz.transform(vnt_data).select(scnd_feat_col, scnd_lab_col, scnd_pred_col).show

++---++
|features|   label| predicted_label|
++---++
|[-0.2006809895664...|0.13271745268556925|-0.20472873432747501|
|[-0.0938342314459...|0.23956421080605234|-0.09572687219665929|
|[0.14124846466227...|0.47464690691431843|  0.1440974526709132|
|[-0.2392557563850...| 0.0941426858669834|-0.24408155596148765|
|[-0.1521267139124...|0.18127172833957172|-0.15519511670726388|
|[0.09459972732745...| 0.4279981695794981| 0.09650780816515314|
|[-0.2869124070364...|0.04648603521554342|-0.29269944344167753|
|[0.08308522965365...| 0.4164836719056925|  0.0847610625453144|
|[-0.1782071340168...|0.15519130823516833| -0.1818015800809893|
|[0.01243907123934...|0.34583751349139175|0.012689967876592361|
|[0.2398374565764162]| 0.5732358988284585| 0.24467498907237623|
|[-0.2098781833195...|0.12352025893247957|-0.21411143590026604|
|[0.3388222585363807]|  0.672220700788423|  0.3456563190264363|
|[-0.2154359640677...|0.11796247818430779| -0.2197813173409589|
|[-0.0069199164427...|0.32647852580932724|-0.00705949147465...|
|[-0.2412818988585...|0.09211654339348248|-0.24614856582157998|
|[0.15735585551485...| 0.4907542977669017| 0.16052973033553486|
|[-0.0078096165360...| 0.3255888257160203|-0.00796713685963...|
|[0.5208905735291393]| 0.8542890157811815|  0.5313969602806332|
|[-0.2201425828305...| 0.1132558594215048|-0.22458286882001133|
++---++
only showing top 20 rows

makes much more sense

Thanks for the help,
saif



RE: Inconsistent Persistence of DataFrames in Spark 1.5

2015-10-28 Thread Saif.A.Ellafi
Hi, just a couple cents.

are your joining columns StringTypes (id field)? I have recently reported a bug 
where having inconsistent results when filtering String fields in group 
operations.

Saif

From: Colin Alstad [mailto:colin.als...@pokitdok.com]
Sent: Wednesday, October 28, 2015 12:39 PM
To: user@spark.apache.org
Subject: Inconsistent Persistence of DataFrames in Spark 1.5

We recently switched to Spark 1.5.0 from 1.4.1 and have noticed some 
inconsistent behavior in persisting DataFrames.

df1 = sqlContext.read.parquet(“df1.parquet”)
df1.count()
> 161,100,982

df2 = sqlContext.read.parquet(“df2.parquet”)
df2.count()
> 67,498,706

join_df = df1.join(df2, ‘id’)
join_df.count()
> 160,608,147

join_df.write.parquet(“join.parquet”)
join_parquet = sqlContext.read.parquet(“join.parquet”)
join_parquet.count()
> 67,698,892

join_df.write.json(“join.json”)
join_json = sqlContext.read.parquet(“join.json”)
join_son.count()
> 67,695,663

The first major issue is that there is an order of magnitude difference between 
the count of the join DataFrame and the persisted join DataFrame.  Secondly, 
persisting the same DataFrame into 2 different formats yields different results.

Does anyone have any idea on what could be going on here?

--
Colin Alstad
Data Scientist
colin.als...@pokitdok.com

[https://platform.pokitdok.com/static/pd-assets/images/brand-nav.png]


Kryo makes String data invalid

2015-10-26 Thread Saif.A.Ellafi
Hi all,

I have a parquet file, which I am loading in a shell. When I launch the shell 
with -driver-java-options ="-Dspark.serializer=...kryo", makes a couple fields 
look like:

03-?? ??-?? ??-???
when calling > data.first

I will confirm briefly, but I am utterly sure it happens only on StringType 
fields.

Why could this be happening? perhaps when creating the parquet file from spark, 
Kryo wasn't set up?

If I disable Kryo, data looks good.

Any ideas?
Saif



Results change in group by operation

2015-10-26 Thread Saif.A.Ellafi
Hello Everyone,

I would need urgent help with a data consistency issue I am having.
Stand alone Cluster of five servers. sqlContext instance of HiveContext 
(default in spark-shell)
No special options other than driver memory and executor memory.
Parquet partitions are 512 where there are 160 cores.
Data is nearly 2 billion rows.
The issue happens

val data = sqlContext.read.parquet("/var/Saif/data_pqt")

val res = data.groupBy("product", "band", "age", "vint", "mb", 
"mm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), 
sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), 
sum($"payment").as("payment"), sum($"feoc").as("feoc"), 
sum($"cfintbal").as("cfintbal"), count($"newacct" === 
1).as("newacct")).persist()

val z = res.select("vint", "mm").filter("vint = 
'2007-01-01'").select("mm").distinct.collect

z.length

>>> res0: Int = 102

res.unpersist()

val z = res.select("vint", "mm").filter("vint = 
'2007-01-01'").select("mm").distinct.collect

z.length

>>> res1: Int = 103

Please help,
Saif







Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Saif.A.Ellafi
Hello everyone,

I am doing some analytics experiments under a 4 server stand-alone cluster in a 
spark shell, mostly involving a huge database with groupBy and aggregations.

I am picking 6 groupBy columns and returning various aggregated results in a 
dataframe. GroupBy fields are of two types, most of them are StringType and the 
rest are LongType.

The data source is a splitted json file dataframe,  once the data is persisted, 
the result is consistent. But if I unload the memory and reload the data, the 
groupBy action returns different content results, missing data.

Could I be missing something? this is rather serious for my analytics, and not 
sure how to properly diagnose this situation.

Thanks,
Saif



RE: Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Saif.A.Ellafi
Thanks, sorry I cannot share the data and not sure how much significant it will 
be for you.
I am reproducing the issue on a smaller piece of the content and see wether I 
find a reason on the inconsistence.

val res2 = data.filter($"closed" === $"ever_closed").groupBy("product", "band 
", "aget", "vine", "time", "mm").agg(count($"account_id").as("N"), 
sum($"balance").as("balance"), sum($"spend").as("spend"), 
sum($"payment").as("payment")).persist()

then I collect distinct values of “vine” (which is StringType) both from data 
and res2, and res2 is missing a lot of values:

val t1 = res2.select("vine").distinct.collect
scala> t1.size
res10: Int = 617

val t_real = data.select("vine").distinct.collect
scala> t_real.size
res9: Int = 639


From: Xiao Li [mailto:gatorsm...@gmail.com]
Sent: Thursday, October 22, 2015 12:45 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Spark groupby and agg inconsistent and missing data

Hi, Saif,

Could you post your code here? It might help others reproduce the errors and 
give you a correct answer.

Thanks,

Xiao Li

2015-10-22 8:27 GMT-07:00 
>:
Hello everyone,

I am doing some analytics experiments under a 4 server stand-alone cluster in a 
spark shell, mostly involving a huge database with groupBy and aggregations.

I am picking 6 groupBy columns and returning various aggregated results in a 
dataframe. GroupBy fields are of two types, most of them are StringType and the 
rest are LongType.

The data source is a splitted json file dataframe,  once the data is persisted, 
the result is consistent. But if I unload the memory and reload the data, the 
groupBy action returns different content results, missing data.

Could I be missing something? this is rather serious for my analytics, and not 
sure how to properly diagnose this situation.

Thanks,
Saif




RE: Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Saif.A.Ellafi
nevermind my last email. res2 is filtered so my test does not make sense. The 
issue is not reproduced there. I have the problem somwhere else.

From: Ellafi, Saif A.
Sent: Thursday, October 22, 2015 12:57 PM
To: 'Xiao Li'
Cc: user
Subject: RE: Spark groupby and agg inconsistent and missing data

Thanks, sorry I cannot share the data and not sure how much significant it will 
be for you.
I am reproducing the issue on a smaller piece of the content and see wether I 
find a reason on the inconsistence.

val res2 = data.filter($"closed" === $"ever_closed").groupBy("product", "band 
", "aget", "vine", "time", "mm").agg(count($"account_id").as("N"), 
sum($"balance").as("balance"), sum($"spend").as("spend"), 
sum($"payment").as("payment")).persist()

then I collect distinct values of “vine” (which is StringType) both from data 
and res2, and res2 is missing a lot of values:

val t1 = res2.select("vine").distinct.collect
scala> t1.size
res10: Int = 617

val t_real = data.select("vine").distinct.collect
scala> t_real.size
res9: Int = 639


From: Xiao Li [mailto:gatorsm...@gmail.com]
Sent: Thursday, October 22, 2015 12:45 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Spark groupby and agg inconsistent and missing data

Hi, Saif,

Could you post your code here? It might help others reproduce the errors and 
give you a correct answer.

Thanks,

Xiao Li

2015-10-22 8:27 GMT-07:00 
>:
Hello everyone,

I am doing some analytics experiments under a 4 server stand-alone cluster in a 
spark shell, mostly involving a huge database with groupBy and aggregations.

I am picking 6 groupBy columns and returning various aggregated results in a 
dataframe. GroupBy fields are of two types, most of them are StringType and the 
rest are LongType.

The data source is a splitted json file dataframe,  once the data is persisted, 
the result is consistent. But if I unload the memory and reload the data, the 
groupBy action returns different content results, missing data.

Could I be missing something? this is rather serious for my analytics, and not 
sure how to properly diagnose this situation.

Thanks,
Saif




How to speed up reading from file?

2015-10-16 Thread Saif.A.Ellafi
Hello,

Is there an optimal number of partitions per number of rows, when writing into 
disk, so we can re-read later from source in a distributed way?
Any  thoughts?

Thanks
Saif



thriftserver: access temp dataframe from in-memory of spark-shell

2015-10-14 Thread Saif.A.Ellafi
Hi,

Is it possible to load a spark-shell, in which we do any number of operations 
in a dataframe, then register it as a temporary table and get to see it through 
thriftserver?
ps. or even better, submit a full job and store the dataframe in thriftserver 
in-memory before the job completes.

I have been trying this without success, bee does not see the dataframes of the 
spark-shell's hive context.
If any of you confirms this possibility, I will try further ahead. So far it 
only seems to be able to manually read from persistent tables.

Thanks for any insights,
Saif



Spark shuffle service does not work in stand alone

2015-10-13 Thread Saif.A.Ellafi
Has anyone tried shuffle service in Stand Alone cluster mode? I want to enable 
it for d.a. but my jobs never start when I submit them.
This happens with all my jobs.



15/10/13 08:29:45 INFO DAGScheduler: Job 0 failed: json at DataLoader.scala:86, 
took 16.318615 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 0.0 (TID 7, 162.101.194.47): ExecutorLostFailure (executor 4 
lost)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
at 
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1114)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1091)
at 
org.apache.spark.sql.execution.datasources.json.InferSchema$.apply(InferSchema.scala:58)
at 
org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$6.apply(JSONRelation.scala:105)
at 
org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$6.apply(JSONRelation.scala:100)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema$lzycompute(JSONRelation.scala:100)
at 
org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema(JSONRelation.scala:99)
at 
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
at 
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
at 
org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:120)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:219)
at org.apache.saif.loaders.DataLoader$.load_json(DataLoader.scala:86)




RE: Spark shuffle service does not work in stand alone

2015-10-13 Thread Saif.A.Ellafi
Hi, thanks

Executors are simply failing to connect to a shuffle server:

15/10/13 08:29:34 INFO BlockManagerMaster: Registered BlockManager
15/10/13 08:29:34 INFO BlockManager: Registering executor with local external 
shuffle service.
15/10/13 08:29:34 ERROR BlockManager: Failed to connect to external shuffle 
server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to /162.xxx.zzz.yy:port
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:140)
at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:220)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:217)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:203)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:86)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Connection refused: /162.xxx.zzz.yy:port
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)


From: Marcelo Vanzin [mailto:van...@cloudera.com]
Sent: Tuesday, October 13, 2015 1:13 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Spark shuffle service does not work in stand alone

It would probably be more helpful if you looked for the executor error and 
posted it. The screenshot you posted is the driver exception caused by the task 
failure, which is not terribly useful.

On Tue, Oct 13, 2015 at 7:23 AM, 
> wrote:
Has anyone tried 

RE: Spark shuffle service does not work in stand alone

2015-10-13 Thread Saif.A.Ellafi
I believe the confusion here is self-answered.
The thing is that in the documentation, the spark shuffle service runs only 
under YARN, while here we are speaking about a stand alone cluster.

The proper question is, how to launch a shuffle service for stand alone?

Saif

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Tuesday, October 13, 2015 2:25 PM
To: van...@cloudera.com
Cc: user@spark.apache.org
Subject: RE: Spark shuffle service does not work in stand alone

Hi, thanks

Executors are simply failing to connect to a shuffle server:

15/10/13 08:29:34 INFO BlockManagerMaster: Registered BlockManager
15/10/13 08:29:34 INFO BlockManager: Registering executor with local external 
shuffle service.
15/10/13 08:29:34 ERROR BlockManager: Failed to connect to external shuffle 
server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to /162.xxx.zzz.yy:port
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:140)
at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:220)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:217)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:203)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:86)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Connection refused: /162.xxx.zzz.yy:port
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)


From: Marcelo Vanzin 

RE: Spark shuffle service does not work in stand alone

2015-10-13 Thread Saif.A.Ellafi
Thanks, I missed that one.

From: Marcelo Vanzin [mailto:van...@cloudera.com]
Sent: Tuesday, October 13, 2015 2:36 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Spark shuffle service does not work in stand alone

You have to manually start the shuffle service if you're not running YARN. See 
the "sbin/start-shuffle-service.sh" script.

On Tue, Oct 13, 2015 at 10:29 AM, 
> wrote:
I believe the confusion here is self-answered.
The thing is that in the documentation, the spark shuffle service runs only 
under YARN, while here we are speaking about a stand alone cluster.

The proper question is, how to launch a shuffle service for stand alone?

Saif

From: saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]
Sent: Tuesday, October 13, 2015 2:25 PM
To: van...@cloudera.com
Cc: user@spark.apache.org
Subject: RE: Spark shuffle service does not work in stand alone

Hi, thanks

Executors are simply failing to connect to a shuffle server:

15/10/13 08:29:34 INFO BlockManagerMaster: Registered BlockManager
15/10/13 08:29:34 INFO BlockManager: Registering executor with local external 
shuffle service.
15/10/13 08:29:34 ERROR BlockManager: Failed to connect to external shuffle 
server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to /162.xxx.zzz.yy:port
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:140)
at 
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:220)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at 
org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:217)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:203)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:86)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.net.ConnectException: Connection refused: /162.xxx.zzz.yy:port
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 

Best storage format for intermediate process

2015-10-09 Thread Saif.A.Ellafi
Hi all,

I am in the procss of learning big data.
Right now, I am bringing huge databases through JDBC to Spark (a 250 million 
rows table can take around 3 hours), and then re-saving it into JSON, which is 
fast, simple, distributed, fail-safe and stores data types, although without 
any compression.

Reading from distributed JSON takes for this amount of data, around 2-3 minutes 
and works good enough for me. But, do you suggest or prefer any other format 
for intermediate storage, for fast and proper types reading?
Not only for intermediate between a network database, but also for intermediate 
dataframe transformations to have data ready for processing.

I have tried CSV but computational type inferring does not usually fit my needs 
and take long types. Haven't tried parquet since they fixed it for 1.5, but 
that is also another option.
What do you also think of HBase, Hive or any other type?

Looking for insights!
Saif



RE: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Saif.A.Ellafi
Where can we find other available functions such as lit() ? I can’t find lit in 
the api.

Thanks

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Friday, October 09, 2015 4:04 PM
To: unk1102
Cc: user
Subject: Re: How to calculate percentile of a column of DataFrame?

You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from dataframes.

On Fri, Oct 9, 2015 at 12:01 PM, unk1102 
> wrote:
Hi how to calculate percentile of a column in a DataFrame? I cant find any
percentile_approx function in Spark aggregation functions. For e.g. in Hive
we have percentile_approx and we can use it in the following way

hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);

I can see ntile function but not sure how it is gonna give results same as
above query please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



RE: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Saif.A.Ellafi
Yes but I mean, this is rather curious. How is def lit(literal:Any) --> becomes 
a percentile function lit(25)

Thanks for clarification
Saif

From: Umesh Kacha [mailto:umesh.ka...@gmail.com]
Sent: Friday, October 09, 2015 4:10 PM
To: Ellafi, Saif A.
Cc: Michael Armbrust; user
Subject: Re: How to calculate percentile of a column of DataFrame?

I found it in 1.3 documentation lit says something else not percent


public static 
Column
 lit(Object literal)
Creates a 
Column
 of literal value.

The passed in object is returned directly if it is already a 
Column.
 If the object is a Scala Symbol, it is converted into a 
Column
 also. Otherwise, a new 
Column
 is created to represent the literal value.

On Sat, Oct 10, 2015 at 12:39 AM, 
> wrote:
Where can we find other available functions such as lit() ? I can’t find lit in 
the api.

Thanks

From: Michael Armbrust 
[mailto:mich...@databricks.com]
Sent: Friday, October 09, 2015 4:04 PM
To: unk1102
Cc: user
Subject: Re: How to calculate percentile of a column of DataFrame?

You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from dataframes.

On Fri, Oct 9, 2015 at 12:01 PM, unk1102 
> wrote:
Hi how to calculate percentile of a column in a DataFrame? I cant find any
percentile_approx function in Spark aggregation functions. For e.g. in Hive
we have percentile_approx and we can use it in the following way

hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);

I can see ntile function but not sure how it is gonna give results same as
above query please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif



RE: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
Hi, thanks for looking into. v1.5.1. I am really worried.
I dont have hive/hadoop for real in the environment.

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, October 08, 2015 2:57 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: RowNumber in HiveContext returns null or negative values

Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM, 
> wrote:
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif




RE: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
It turns out this does not happen in local[32] mode. Only happens when 
submiting to standalone cluster. Don’t have YARN/MESOS to compare.

Will keep diagnosing.

Saif

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Thursday, October 08, 2015 3:01 PM
To: mich...@databricks.com
Cc: user@spark.apache.org
Subject: RE: RowNumber in HiveContext returns null or negative values

Hi, thanks for looking into. v1.5.1. I am really worried.
I dont have hive/hadoop for real in the environment.

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, October 08, 2015 2:57 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: RowNumber in HiveContext returns null or negative values

Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM, 
> wrote:
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif




RE: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
Repartition and default parallelism to 1, in cluster mode, is still broken.

So the problem is not the parallelism, but the cluster mode itself. Something 
wrong with HiveContext + cluster mode.

Saif

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Thursday, October 08, 2015 3:01 PM
To: mich...@databricks.com
Cc: user@spark.apache.org
Subject: RE: RowNumber in HiveContext returns null or negative values

Hi, thanks for looking into. v1.5.1. I am really worried.
I dont have hive/hadoop for real in the environment.

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, October 08, 2015 2:57 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: RowNumber in HiveContext returns null or negative values

Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM, 
> wrote:
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif




RE: Spark standalone hangup during shuffle flatMap or explode in cluster

2015-10-07 Thread Saif.A.Ellafi
It can be large yes. But, that still does not resolve the question of why it 
works in smaller environment, i.e. Local[32] or in cluster mode when using 
SQLContext instead of HiveContext.

The process in general, is a RowNumber() hiveQL operation, that is why I need 
HiveContext.

I have the feeling there is something wrong with HiveContext. I dont have a 
Hive Hadoop database, I only enabled HiveContext to use its functions in my 
JSON loaded dataframe.

I am new at spark, please dont hesitate to ask for more information as I still 
not sure what would be relevant.

Saif

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, October 07, 2015 2:38 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Spark standalone hangup during shuffle flatMap or explode in 
cluster

-dev

Is r.getInt(ind) very large in some cases? I think there's not quite enough 
info here.

On Wed, Oct 7, 2015 at 6:23 PM,   wrote:
> When running stand-alone cluster mode job, the process hangs up 
> randomly during a DataFrame flatMap or explode operation, in HiveContext:
>
> -->> df.flatMap(r => for (n <- 1 to r.getInt(ind)) yield r)
>
> This does not happen either with SQLContext in cluster, or Hive/SQL in 
> local mode, where it works fine.
>
> A couple minutes after the hangup, executors start dropping. I am 
> attching the logs Saif
>
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org


Spark standalone hangup during shuffle flatMap or explode in cluster

2015-10-07 Thread Saif.A.Ellafi
When running stand-alone cluster mode job, the process hangs up randomly during 
a DataFrame flatMap or explode operation, in HiveContext:

-->> df.flatMap(r => for (n <- 1 to r.getInt(ind)) yield r)

This does not happen either with SQLContext in cluster, or Hive/SQL in local 
mode, where it works fine.

A couple minutes after the hangup, executors start dropping. I am attching the 
logs

Saif




15/10/07 12:15:19 INFO TaskSetManager: Finished task 50.0 in stage 17.0 (TID 
166) in 2511 ms on 162.101.194.47 (180/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 66.0 in stage 17.0 (TID 
182) in 2510 ms on 162.101.194.47 (181/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 110.0 in stage 17.0 (TID 
226) in 2505 ms on 162.101.194.47 (182/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 74.0 in stage 17.0 (TID 
190) in 2530 ms on 162.101.194.47 (183/200)
15/10/07 12:15:19 INFO TaskSetManager: Finished task 106.0 in stage 17.0 (TID 
222) in 2530 ms on 162.101.194.47 (184/200)
15/10/07 12:20:01 WARN HeartbeatReceiver: Removing executor 2 with no recent 
heartbeats: 141447 ms exceeds timeout 12 ms
15/10/07 12:20:01 ERROR TaskSchedulerImpl: Lost executor 2 on 162.101.194.44: 
Executor heartbeat timed out after 141447 ms
15/10/07 12:20:01 INFO TaskSetManager: Re-queueing tasks for 2 from TaskSet 17.0
15/10/07 12:20:01 WARN TaskSetManager: Lost task 113.0 in stage 17.0 (TID 229, 
162.101.194.44): ExecutorLostFailure (executor 2 lost)
15/10/07 12:20:01 WARN TaskSetManager: Lost task 73.0 in stage 17.0 (TID 189, 
162.101.194.44): ExecutorLostFailure (executor 2 lost)
15/10/07 12:20:01 WARN TaskSetManager: Lost task 81.0 in stage 17.0 (TID 197, 
162.101.194.44): ExecutorLostFailure (executor 2 lost)
15/10/07 12:20:01 INFO TaskSetManager: Starting task 81.1 in stage 17.0 (TID 
316, 162.101.194.45, PROCESS_LOCAL, 2045 bytes)
15/10/07 12:20:01 INFO TaskSetManager: Starting task 73.1 in stage 17.0 (TID 
317, 162.101.194.44, PROCESS_LOCAL, 2045 bytes)
15/10/07 12:20:01 INFO TaskSetManager: Starting task 113.1 in stage 17.0 (TID 
318, 162.101.194.48, PROCESS_LOCAL, 2045 bytes)
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Requesting to kill 
executor(s) 2
15/10/07 12:20:01 INFO DAGScheduler: Executor lost: 2 (epoch 4)
15/10/07 12:20:01 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 
from BlockManagerMaster.
15/10/07 12:20:01 INFO BlockManagerMasterEndpoint: Removing block manager 
BlockManagerId(2, 162.101.194.44, 42537)
15/10/07 12:20:01 INFO BlockManagerMaster: Removed 2 successfully in 
removeExecutor
15/10/07 12:20:01 INFO ShuffleMapStage: ShuffleMapStage 15 is now unavailable 
on executor 2 (1/2, false)
15/10/07 12:20:01 INFO ShuffleMapStage: ShuffleMapStage 16 is now unavailable 
on executor 2 (8/16, false)
15/10/07 12:20:01 INFO DAGScheduler: Host added was in lost list earlier: 
162.101.194.44
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor added: 
app-20151007121501-0022/69 on worker-20151007063932-162.101.194.44-57091 
(162.101.194.44:57091) with 32 cores
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20151007121501-0022/69 on hostPort 162.101.194.44:57091 with 32 cores, 
100.0 GB RAM
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/69 is now RUNNING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/69 is now LOADING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/69 is now EXITED (Command exited with code 1)
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Executor 
app-20151007121501-0022/69 removed: Command exited with code 1
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 69
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor added: 
app-20151007121501-0022/70 on worker-20151007063932-162.101.194.44-57091 
(162.101.194.44:57091) with 32 cores
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20151007121501-0022/70 on hostPort 162.101.194.44:57091 with 32 cores, 
100.0 GB RAM
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/70 is now RUNNING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/70 is now LOADING
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor updated: 
app-20151007121501-0022/70 is now EXITED (Command exited with code 1)
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Executor 
app-20151007121501-0022/70 removed: Command exited with code 1
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Asked to remove 
non-existent executor 70
15/10/07 12:20:01 INFO AppClient$ClientEndpoint: Executor added: 
app-20151007121501-0022/71 on worker-20151007063932-162.101.194.44-57091 
(162.101.194.44:57091) with 32 cores
15/10/07 12:20:01 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20151007121501-0022/71 on 

Help with big data operation performance

2015-10-06 Thread Saif.A.Ellafi
Hi all,

In a stand-alone cluster operation, with more than 80 gbs of ram in each node, 
am trying to:

1.  load a partitioned json dataframe which weights around 100GB as input
2.  apply transformations such as cast some column types
3.  get some percentiles which involves sort by, rdd transformations and 
lookup actions
4.  flatten many unordered rows with HiveContext df explode function
5.  compute histogram to each column of the dataframe

I have tried many strategies, from threaded per-column transformations, to a 
for loop with each column. Tried to put as many common transformations out of 
this loop as possible, and persist the original dataframe, at different points. 
Also FIFO vs FAIR, speculation, etc.

With a limited dataframe to 50k rows, the operation is succesful in local[32], 
in a somewhat short amount of time, eg 30 minutes for all columns. Where around 
14 minutes is only loading the data into memory and doing a count. But when I 
want to go further away, I need to go into cluster mode.

The first transformations and the percentile calculations, are very fast, 
faster than local mode. But computing the histogram on the final dataframe gets 
stuck, it seems stuck at garbage collection operations and never completes. 
Trying the 50k limited dataframe within cluster mode, happens exactly the same, 
where local mode succeeded. The task seems to hang at some random point out out 
of always the same stage, at the very end of the histogram computation.

I have also tried repartitioning up to 1024 pieces and coalescing down to 1 
piece, with no different result. The process always hangs up in cluster mode. 
Local mode cannot handle this big operation and ends up stuck somewhere as 
well. I have enabled the parallel GCer too.

How can I proceed and diagnose?
Any help thankful
Saif



Please help: Processes with HiveContext slower in cluster

2015-10-05 Thread Saif.A.Ellafi
Hi,

I have a HiveContext job which takes less than 1 minute to complete in local 
mode with 16 cores.
However, when I launch it over stand-alone cluster, it takes for ever, probably 
can't even finish. Even when I have the same only node running up in which I 
execute it locally.

How could I diagnose this issue? where can I start?

Thanks!
Saif



How to change verbosity level and redirect verbosity to file?

2015-10-05 Thread Saif.A.Ellafi
Hi,

I would like to read the full spark-submit log once a job is completed, since 
the output is not stdout, how could I redirect spark output to a file in linux?

Thanks,
Saif



RE: Accumulator of rows?

2015-10-02 Thread Saif.A.Ellafi
Thank you, exactly what I was looking for. I have read of it before but never 
associated.

Saif

From: Adrian Tanase [mailto:atan...@adobe.com]
Sent: Friday, October 02, 2015 8:24 AM
To: Ellafi, Saif A.; user@spark.apache.org
Subject: Re: Accumulator of rows?

Have you seen window functions?
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

From: "saif.a.ell...@wellsfargo.com"
Date: Thursday, October 1, 2015 at 9:44 PM
To: "user@spark.apache.org"
Subject: Accumulator of rows?

Hi all,

I need to repeat a couple rows from a dataframe by n times each. To do so, I 
plan to create a new Data Frame, but I am being unable to find a way to 
accumulate “Rows” somewhere, as this might get huge, I can’t accumulate into a 
mutable Array, I think?.

Thanks,
Saif



from groupBy return a DataFrame without aggregation?

2015-10-02 Thread Saif.A.Ellafi
Given ID, DATE, I need all sorted dates per ID, what is the easiest way?

I got this but I don't like it:
val l = zz.groupBy("id", " dt").agg($"dt".as("dummy")).sort($"dt".asc)

Saif



RE: how to broadcast huge lookup table?

2015-10-02 Thread Saif.A.Ellafi
Hi, thank you

I would prefer to leave writing-to-disk as a last resort. Is it a last resort?

Saif

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Friday, October 02, 2015 3:54 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: how to broadcast huge lookup table?

Have you considered using external storage such as hbase for storing the look 
up table ?

Cheers

On Fri, Oct 2, 2015 at 11:50 AM, 
> wrote:
I tried broadcasting a key-value rdd, but then I cannot perform any rdd-actions 
inside a map/foreach function of another rdd.

any tips? If going into scala collections I end up with huge memory bottlenecks.

Saif




how to broadcast huge lookup table?

2015-10-02 Thread Saif.A.Ellafi
I tried broadcasting a key-value rdd, but then I cannot perform any rdd-actions 
inside a map/foreach function of another rdd.

any tips? If going into scala collections I end up with huge memory bottlenecks.

Saif



Accumulator of rows?

2015-10-01 Thread Saif.A.Ellafi
Hi all,

I need to repeat a couple rows from a dataframe by n times each. To do so, I 
plan to create a new Data Frame, but I am being unable to find a way to 
accumulate "Rows" somewhere, as this might get huge, I can't accumulate into a 
mutable Array, I think?.

Thanks,
Saif



What is the best way to submit multiple tasks?

2015-09-30 Thread Saif.A.Ellafi
Hi all,

I have a process where I do some calculations on each one of the columns of a 
dataframe.
Intrinsecally, I run across each column with a for loop. On the other hand, 
each process itself is non-entirely-distributable.

To speed up the process, I would like to submit a spark program for each 
column, any suggestions? I was thinking on primitive threads sharing a spark 
context.

Thank you,
Saif



Cant perform full outer join

2015-09-29 Thread Saif.A.Ellafi
Hi all,

So I Have two dataframes, with two columns: DATE and VALUE.

Performing this:
data = data.join(cur_data, data("DATE") === cur_data("DATE"), "outer")

returns
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 
'DATE' is ambiguous, could be: DATE#0, DATE#3.;

But if I change one of the column names, I will get two columns and won't 
really merge "DATE" column as I wish. Any ideas without going to non trivial 
procedures?

Thanks,
Saif



Where can I learn how to write udf?

2015-09-14 Thread Saif.A.Ellafi
Hi all,

I am failing to find a proper guide or tutorial onto how to write proper udf 
functions in scala.

Appreciate the effort saving,
Saif



Compress JSON dataframes

2015-09-08 Thread Saif.A.Ellafi
Hi,

I am trying to figure out a way to compress df.write.json() but have not been 
succesful, even changing spark.io.compression.

Any thoughts?
Saif



RE: What is the current status of ML ?

2015-09-01 Thread Saif.A.Ellafi
Thank you, so I was inversely confused. At first I thought MLLIB was the 
future, but based on what you say. MLLIB will be the past. Intersting.
This means that if I look forward over using the pipelines system, I shouldn't 
be obsolete.

Any more insights welcome,
Saif

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Tuesday, September 01, 2015 3:31 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: What is the current status of ML ?

I think the story is that the new spark.ml "pipelines" API is the future. Most 
(all?) of the spark.mllib functionality has been ported over and/or translated. 
I don't know that spark.mllib will actually be deprecated soon -- not until 
spark.ml is fully blessed as 'stable' I'd imagine, at least. Even if it were I 
don't think it would go away. You can use spark.mllib now as it is pretty 
stable with some confidence, and look to spark.ml if you're interested in the 
"2.0" of MLlib and are willing to work with APIs that may change a bit.

On Tue, Sep 1, 2015 at 7:23 PM,   wrote:
> Hi all,
>
> I am little bit confused, as getting introduced to Spark recently. 
> What is going on with ML? Is it going to be deprecated? Or are all of 
> its features valid and constructed over? It has a set of features and 
> ML Constructors which I like to use, but need to confirm wether the 
> future of these functionalities will be valid upon the future.
> I am reading here and there different calls on this, being on the 
> official site that all contributions should go to MLLIB, and even that 
> ML uses MLLIB already.
>
> Saif
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is the current status of ML ?

2015-09-01 Thread Saif.A.Ellafi
Hi all,

I am little bit confused, as getting introduced to Spark recently. What is 
going on with ML? Is it going to be deprecated? Or are all of its features 
valid and constructed over? It has a set of features and ML Constructors which 
I like to use, but need to confirm wether the future of these functionalities 
will be valid upon the future.
I am reading here and there different calls on this, being on the official site 
that all contributions should go to MLLIB, and even that ML uses MLLIB already.

Saif



Best way to filter null on any column?

2015-08-27 Thread Saif.A.Ellafi
Hi all,

What would be a good way to filter rows in a dataframe, where any value in a 
row has null? I wouldn't want to go through each column manually.

Thanks,
Saif



RE: Help! Stuck using withColumn

2015-08-27 Thread Saif.A.Ellafi
Hello, thank you for the response.

I found a blog where a guy explains that it is not possible to join columns 
from different data frames.

I was trying to modify one column’s information, so selecting it and then 
trying to replace the original dataframe column. Found another way,

Thanks
Saif

From: Silvio Fiorito [mailto:silvio.fior...@granturing.com]
Sent: Wednesday, August 26, 2015 8:54 PM
To: Ellafi, Saif A.; user@spark.apache.org
Subject: Re: Help! Stuck using withColumn

Hi Saif,

In both cases you’re referencing columns that don’t exist in the current 
DataFrame.

The first email you did a select and then a withColumn for ‘month_date_cur' on 
the resulting DF, but that column does not exist, because you did a select for 
only ‘month_balance’.

In the second email you’re using 2 different DFs and trying to select a column 
from one in a withColumn on the other, that just wouldn’t work. Also, there’s 
no explicit column names given to either DF, so that column doesn’t exist.

Did you intend to do a join instead?

Thanks,
Silvio

From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
Date: Wednesday, August 26, 2015 at 6:06 PM
To: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com, 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Help! Stuck using withColumn

I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD)
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD)

gf.withColumn(DSA, ff.col(GFD))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Is there a way to store RDD and load it with its original format?

2015-08-27 Thread Saif.A.Ellafi
Hi,

Any way to store/load RDDs keeping their original object instead of string?

I am having trouble with parquet (there is always some error at class 
conversion), and don't use hadoop. Looking for alternatives.

Thanks in advance
Saif



Help! Stuck using withColumn

2015-08-26 Thread Saif.A.Ellafi
This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





RE: Help! Stuck using withColumn

2015-08-26 Thread Saif.A.Ellafi
I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD)
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD)

gf.withColumn(DSA, ff.col(GFD))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Scala: Overload method by its class type

2015-08-25 Thread Saif.A.Ellafi
Hi all,

I have SomeClass[TYPE] { def some_method(args: fixed_type_args): TYPE }

And on runtime, I create instances of this class with different AnyVal + String 
types, but the return type of some_method varies.

I know I could do this with an implicit object, IF some_method received a type, 
but in this case, I need to have the TYPE defined on its class instance, so for 
example:

val int_instance = new SomeClass[Int]
val str_instance = new SomeClass[String]
val result: Boolean = int_instance.some_method(args)  0   --- I expected 
INT here
val result2: Boolean = str_instance.som_method(args) contains asdfg  
I expected STRING here.

without compilation errors.

Any ideas? I would like to implement something like this:

class SomeClass[TYPE] {

def some_method(args: Int): Int = {
process_integer_overloaded_method
}

def some_method(args: Int): String = {
process_string_overloaded_method
}

and so on.

Any ideas? maybe store classe's TYPE in a constructor instead as a variable 
somehow?

Thanks
Saif



Want to install lz4 compression

2015-08-21 Thread Saif.A.Ellafi
Hi all,

I am using pre-compiled spark with hadoop 2.6. LZ4 Codec is not on hadoop's 
native libraries, so I am not being able to use it.

Can anyone suggest on how to proceed? Hopefully I wont have to recompile 
hadoop. I tried changing the --driver-library-path to point directly into lz4 
stand alone package libraries, but of course it didnt work.

Thanks
Saif



Set custm worker id ?

2015-08-21 Thread Saif.A.Ellafi
Hi,

Is it possible in standalone to set up worker ID names? to avoid the 
worker-19248891237482379-ip..-port ??

Thanks,
Saif



RE: Scala: How to match a java object????

2015-08-19 Thread Saif.A.Ellafi
Hi, thank you all for the asssistance.

It is odd, it works when creating a new java.mathBigDecimal object, but not if 
I work directly with

scala 5 match { case x: java.math.BigDecimal = 2 }
console:23: error: scrutinee is incompatible with pattern type;
found   : java.math.BigDecimal
required: Int
  5 match { case x: java.math.BigDecimal = 2 }

I will try and see how it works for my Seq[Any] and see. Thanks for the work 
arounds.
Saif

From: Sujit Pal [mailto:sujitatgt...@gmail.com]
Sent: Tuesday, August 18, 2015 6:25 PM
To: Ellafi, Saif A.
Cc: wrbri...@gmail.com; user
Subject: Re: Scala: How to match a java object

Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = 
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

myBigDecimals.map(_.doubleValue)

to get your Seq of Doubles. You will need the JavaConversions._ import to allow 
Java Doubles to be treated by Scala as Scala Doubles.

-sujit

On Tue, Aug 18, 2015 at 12:59 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi, thank you for further assistance

you can reproduce this by simply running

5 match { case java.math.BigDecimal = 2 }

In my personal case, I am applying a map acton to a Seq[Any], so the elements 
inside are of type any, to which I need to apply a proper 
.asInstanceOf[WhoYouShouldBe].

Saif

From: William Briggs [mailto:wrbri...@gmail.commailto:wrbri...@gmail.com]
Sent: Tuesday, August 18, 2015 4:46 PM
To: Ellafi, Saif A.; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Scala: How to match a java object


Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: “error: object 
java.math.BigDecimal is not a value”

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif




RE: Scala: How to match a java object????

2015-08-19 Thread Saif.A.Ellafi
It is okay. this methodology works very well for mapping objects of my Seq[Any].

It is indeed very cool :-)

Saif

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, August 19, 2015 10:47 AM
To: Ellafi, Saif A.
Cc: sujitatgt...@gmail.com; wrbri...@gmail.com; user@spark.apache.org
Subject: Re: Scala: How to match a java object

Saif:
In your example below, the error was due to there is no automatic conversion 
from Int to BigDecimal.

Cheers


On Aug 19, 2015, at 6:40 AM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi, thank you all for the asssistance.

It is odd, it works when creating a new java.mathBigDecimal object, but not if 
I work directly with

scala 5 match { case x: java.math.BigDecimal = 2 }
console:23: error: scrutinee is incompatible with pattern type;
found   : java.math.BigDecimal
required: Int
  5 match { case x: java.math.BigDecimal = 2 }

I will try and see how it works for my Seq[Any] and see. Thanks for the work 
arounds.
Saif

From: Sujit Pal [mailto:sujitatgt...@gmail.com]
Sent: Tuesday, August 18, 2015 6:25 PM
To: Ellafi, Saif A.
Cc: wrbri...@gmail.commailto:wrbri...@gmail.com; user
Subject: Re: Scala: How to match a java object

Hi Saif,

Would this work?

import scala.collection.JavaConversions._

new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = 
x.doubleValue }

It gives me on the scala console.

res9: Double = 5.0

Assuming you had a stream of BigDecimals, you could just call map on it.

myBigDecimals.map(_.doubleValue)

to get your Seq of Doubles. You will need the JavaConversions._ import to allow 
Java Doubles to be treated by Scala as Scala Doubles.

-sujit

On Tue, Aug 18, 2015 at 12:59 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi, thank you for further assistance

you can reproduce this by simply running

5 match { case java.math.BigDecimal = 2 }

In my personal case, I am applying a map acton to a Seq[Any], so the elements 
inside are of type any, to which I need to apply a proper 
.asInstanceOf[WhoYouShouldBe].

Saif

From: William Briggs [mailto:wrbri...@gmail.commailto:wrbri...@gmail.com]
Sent: Tuesday, August 18, 2015 4:46 PM
To: Ellafi, Saif A.; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Scala: How to match a java object


Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: “error: object 
java.math.BigDecimal is not a value”

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif




RE: Scala: How to match a java object????

2015-08-18 Thread Saif.A.Ellafi
Hi, thank you for further assistance

you can reproduce this by simply running

5 match { case java.math.BigDecimal = 2 }

In my personal case, I am applying a map acton to a Seq[Any], so the elements 
inside are of type any, to which I need to apply a proper 
.asInstanceOf[WhoYouShouldBe].

Saif

From: William Briggs [mailto:wrbri...@gmail.com]
Sent: Tuesday, August 18, 2015 4:46 PM
To: Ellafi, Saif A.; user@spark.apache.org
Subject: Re: Scala: How to match a java object


Could you share your pattern matching expression that is failing?

On Tue, Aug 18, 2015, 3:38 PM  
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: “error: object 
java.math.BigDecimal is not a value”

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif



RE: Scala: How to match a java object????

2015-08-18 Thread Saif.A.Ellafi
Hi, Can you please elaborate? I am confused :-)

Saif

-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: Tuesday, August 18, 2015 5:15 PM
To: Ellafi, Saif A.
Cc: wrbri...@gmail.com; user@spark.apache.org
Subject: Re: Scala: How to match a java object

On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote:

 5 match { case java.math.BigDecimal = 2 }

5 match { case _: java.math.BigDecimal = 2 }

-- 
Marcelo


Scala: How to match a java object????

2015-08-18 Thread Saif.A.Ellafi
Hi all,

I am trying to run a spark job, in which I receive java.math.BigDecimal 
objects, instead of the scala equivalents, and I am trying to convert them into 
Doubles.
If I try to match-case this object class, I get: error: object 
java.math.BigDecimal is not a value

How could I get around matching java objects? I would like to avoid a multiple 
try-catch on ClassCastExceptions for all my checks.

Thank you,
Saif



Cannot cast to Tuple when running in cluster mode

2015-08-14 Thread Saif.A.Ellafi
Hi All,

I have a working program, in which I create two big tuples2 out of the data. 
This seems to work in local but when I switch over cluster standalone mode, I 
get this error at the very beggining:

15/08/14 10:22:25 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 10, 
162.101.194.44): java.lang.ClassCastException: 
scala.collection.Iterator$$anon$13 cannot be cast to scala.Tuple2
at 
org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The data comes from JDBC, but I also tried persisting it into memory to turn it 
into a collection, in case JDBC was the problem.

Any advice?
Saif



Help with persist: Data is requested again

2015-08-14 Thread Saif.A.Ellafi
Hello all,

I am writing a program which calls from a database. A run a couple 
computations, but in the end I have a while loop, in which I make a 
modification to the persisted thata. eg:

val data = PairRDD... persist()
var i = 0
while (i  10) {
val data_mod = data.map(_._1 + 1, _._2)
val data_joined = data.join(data_mod)
... do stuff with data_joined
}

Sadly, the result causes that the shuffle inside the WHILE loop is causing a 
jdbc call and that is very slow. It is not finding the data locally

How can I help myself?
Saif



Parquet without hadoop: Possible?

2015-08-11 Thread Saif.A.Ellafi
Hi all,

I don't have any hadoop fs installed on my environment, but I would like to 
store dataframes in parquet files. I am failing to do so, if possible, anyone 
have any pointers?

Thank you,
Saif



RE: Parquet without hadoop: Possible?

2015-08-11 Thread Saif.A.Ellafi
I am launching my spark-shell
spark-1.4.1-bin-hadoop2.6/bin/spark-shell

15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF
scala data.write.parquet(/var/ data/Saif/pq)

Then I get a million errors:
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
at 
parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at 
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
...
...
.
15/08/11 09:46:10 ERROR DefaultWriterContainer: Task attempt 
attempt_201508110946__m_11_0 aborted.
15/08/11 09:46:10 ERROR Executor: Exception in task 31.0 in stage 0.0 (TID 31)
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at 

RE: Parquet without hadoop: Possible?

2015-08-11 Thread Saif.A.Ellafi
I confirm that it works,

I was just having this issue: https://issues.apache.org/jira/browse/SPARK-8450

Saif

From: Ellafi, Saif A.
Sent: Tuesday, August 11, 2015 12:01 PM
To: Ellafi, Saif A.; deanwamp...@gmail.com
Cc: user@spark.apache.org
Subject: RE: Parquet without hadoop: Possible?

Sorry, I provided bad information. This example worked fine with reduced 
parallelism.

It seems my problem have to do with something specific with the real data frame 
at reading point.

Saif


From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]
Sent: Tuesday, August 11, 2015 11:49 AM
To: deanwamp...@gmail.commailto:deanwamp...@gmail.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Parquet without hadoop: Possible?

I am launching my spark-shell
spark-1.4.1-bin-hadoop2.6/bin/spark-shell

15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF
scala data.write.parquet(/var/ data/Saif/pq)

Then I get a million errors:
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
at 
parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at 
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
...
...
.
15/08/11 

RE: Parquet without hadoop: Possible?

2015-08-11 Thread Saif.A.Ellafi
Sorry, I provided bad information. This example worked fine with reduced 
parallelism.

It seems my problem have to do with something specific with the real data frame 
at reading point.

Saif


From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Tuesday, August 11, 2015 11:49 AM
To: deanwamp...@gmail.com
Cc: user@spark.apache.org
Subject: RE: Parquet without hadoop: Possible?

I am launching my spark-shell
spark-1.4.1-bin-hadoop2.6/bin/spark-shell

15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF
scala data.write.parquet(/var/ data/Saif/pq)

Then I get a million errors:
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
at 
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at 
parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
at 
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
at 
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at 
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at 
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at 
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at 
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at 
parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
at 
parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at 
org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83)
at 
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
at 
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
at 
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
...
...
.
15/08/11 09:46:10 ERROR DefaultWriterContainer: Task attempt 
attempt_201508110946__m_11_0 aborted.
15/08/11 09:46:10 ERROR Executor: Exception in task 31.0 in stage 0.0 (TID 31)
org.apache.spark.SparkException: Task failed while writing rows.
at 

Does print/event logging affect performance?

2015-08-11 Thread Saif.A.Ellafi
Hi all,

silly question. Does logging info messages, both print or to file, or event 
logging, cause any impact to general performance of spark?

Saif



Spark master driver UI: How to keep it after process finished?

2015-08-07 Thread Saif.A.Ellafi
Hi,

A silly question here. The Driver Web UI dies when the spark-submit program 
finish. I would like some time to analyze after the program ends, as the page 
does not refresh it self, when I hit F5 I lose all the info.

Thanks,
Saif



RE: Spark master driver UI: How to keep it after process finished?

2015-08-07 Thread Saif.A.Ellafi
Hello, thank you, but that port is unreachable for me. Can you please share 
where can I find that port equivalent in my environment?

Thank you
Saif

From: François Pelletier [mailto:newslett...@francoispelletier.org]
Sent: Friday, August 07, 2015 4:38 PM
To: user@spark.apache.org
Subject: Re: Spark master driver UI: How to keep it after process finished?

Hi, all spark processes are saved in the Spark History Server

look at your host on port 18080 instead of 4040

François
Le 2015-08-07 15:26, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com a écrit :
Hi,

A silly question here. The Driver Web UI dies when the spark-submit program 
finish. I would like some time to analyze after the program ends, as the page 
does not refresh it self, when I hit F5 I lose all the info.

Thanks,
Saif




Possible bug: JDBC with Speculative mode launches orphan queries

2015-08-07 Thread Saif.A.Ellafi
Hello,

When enabling speculation, my first job is to launch a partitioned JDBC 
dataframe query, in which some partitions take longer than others to respond.

This causes speculation and creates new nodes to launch the query. When one of 
those nodes finish the query, the speculative one remains forever connected to 
the Database and never ends.
I have to go to the Database management tools and interrupt the query. This 
does not affect the spark program since the JDBC task have already ended. I 
only get the log messages saying that the task execution has been ignored since 
the task in speculation already finished.

Is there any way to enable/disable speculation on a specific task to avoid 
this? do you have any suggestions ? or how can I report this issue?

Saif



RE: Too many open files

2015-07-29 Thread Saif.A.Ellafi
Thank you both, I will take a look, but


1.   For high-shuffle tasks, is this right for the system to have the size 
and thresholds high? I hope there is no bad consequences.

2.   I will try to overlook admin access and see if I can get anything with 
only user rights

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, July 29, 2015 12:59 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Too many open files

Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux


On Jul 29, 2015, at 8:39 AM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hello,

I’ve seen a couple emails on this issue but could not find anything to solve my 
situation.

Tried to reduce the partitioning level, enable consolidateFiles and increase 
the sizeInFlight limit, but still no help. Spill manager is sort, which is the 
default, any advice?

15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID 331, 
localhost): FetchFailed(BlockManagerId(driver, localhost, 43437), shuffleId=3, 
mapId=0, reduceId=34, message=
org.apache.spark.shuffle.FetchFailedException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
 (Too many open files)
..
..
15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in stage 
11.0 (TID 306)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in 
stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage 11.0 
(TID 317, localhost): java.io.FileNotFoundException: 
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
 (Too many open files)

my fs is ext4 and currently ulist –n is 1024

Thanks
Saif



Fighting against performance: JDBC RDD badly distributed

2015-07-28 Thread Saif.A.Ellafi
Hi all,

I am experimenting and learning performance on big tasks locally, with a 32 
cores node and more than 64GB of Ram, data is loaded from a database through 
JDBC driver, and launching heavy computations against it. I am presented with 
two questions:

1.  My RDD is poorly distributed. I am partitioning into 32 pieces, but 
first 31 pieces are extremely lightweight compared to piece 32

  15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 
30). 1419 bytes result sent to driver
  15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0 
(TID 31, localhost, PROCESS_LOCAL, 1539 bytes)
  15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31)
  15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0 
(TID 30) in 2798 ms on localhost (31/32)
  15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found, 
computing it
  ...All pieces take 3 seconds while last one takes around 15 minutes to 
compute...

  Is there anything I can do about this? preferrably without reshufling, 
i.e. in the DataFrameReader JDBC options (lowerBound, upperBound, partition 
column)

2.  After long time of processing, sometimes I get OOMs, I fail to find a 
how-to for fallback and give retries to already persisted data to avoid time.

Thanks,
Saif



RE: Fighting against performance: JDBC RDD badly distributed

2015-07-28 Thread Saif.A.Ellafi
Thank you for your response Zhen,

I am using some vendor specific JDBC driver JAR file (honestly I dont know 
where it came from). It’s api is NOT like JdbcRDD, instead, more like jdbc from 
DataFrameReader
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

So I ask two questions now:

1.   Will running a query using JdbcRDD prove better than bringing an 
entire table as DataFrame? I am later on, converting back to RDDs.

2.   I lack of some proper criteria to decide a proper column for 
distributon. My table has more than 400 columns.

Saif

From: shenyan zhen [mailto:shenya...@gmail.com]
Sent: Tuesday, July 28, 2015 4:16 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Fighting against performance: JDBC RDD badly distributed

Hi Saif,

Are you using JdbcRDD directly from Spark?
If yes, then the poor distribution could be due to the bound key you used.

See the JdbcRDD Scala doc at 
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.JdbcRDD:
sql

the text of the query. The query must contain two ? placeholders for parameters 
used to partition the results. E.g. select title, author from books where ? = 
id and id = ?
lowerBound

the minimum value of the first placeholder
upperBound

the maximum value of the second placeholder The lower and upper bounds are 
inclusive.
numPartitions

the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a 
numPartitions of 2, the query would be executed twice, once with (1, 10) and 
once with (11, 20)

Shenyan


On Tue, Jul 28, 2015 at 2:41 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi all,

I am experimenting and learning performance on big tasks locally, with a 32 
cores node and more than 64GB of Ram, data is loaded from a database through 
JDBC driver, and launching heavy computations against it. I am presented with 
two questions:

1.   My RDD is poorly distributed. I am partitioning into 32 pieces, but 
first 31 pieces are extremely lightweight compared to piece 32

15/07/28 13:37:48 INFO Executor: Finished task 30.0 in stage 0.0 (TID 30). 1419 
bytes result sent to driver
15/07/28 13:37:48 INFO TaskSetManager: Starting task 31.0 in stage 0.0 (TID 31, 
localhost, PROCESS_LOCAL, 1539 bytes)
15/07/28 13:37:48 INFO Executor: Running task 31.0 in stage 0.0 (TID 31)
15/07/28 13:37:48 INFO TaskSetManager: Finished task 30.0 in stage 0.0 (TID 30) 
in 2798 ms on localhost (31/32)
15/07/28 13:37:48 INFO CacheManager: Partition rdd_2_31 not found, computing it
...All pieces take 3 seconds while last one takes around 15 minutes to 
compute...

Is there anything I can do about this? preferrably without reshufling, i.e. in 
the DataFrameReader JDBC options (lowerBound, upperBound, partition column)

2.   After long time of processing, sometimes I get OOMs, I fail to find a 
how-to for fallback and give retries to already persisted data to avoid time.

Thanks,
Saif




CPU Parallelization not being used (local mode)

2015-07-27 Thread Saif.A.Ellafi
Hi all,

would like some insight. I am currently computing huge databases, and playing 
with monitoring and tunning.

When monitoring the multiple cores I have, I see that even when RDDs are 
parallelized, computation on the RDD jump from core to core sporadically ( I 
guess, depending on where the chunk is), So I see one CORE at 100% usage, and 
the other ones sitting idle by, after some time when the task is complete, the 
procesing jumps into another core, and so on.

can you share any general insight on this situation? Does this depend on the 
computation?
I have tried serialization and different setups, but I neve see more than 1 
Core working at a spark-submission.

note: This is no cluster mode, just local processors.

Thanks,
Saif



[MLLIB] Anyone tried correlation with RDD[Vector] ?

2015-07-23 Thread Saif.A.Ellafi
I tried with a RDD[DenseVector] but RDDs are not transformable, so T+ 
RDD[DenseVector] not : RDD[Vector] and can't get to use the RDD input method 
of correlation.

Thanks,
Saif



RE: Select all columns except some

2015-07-17 Thread Saif.A.Ellafi
Hello, thank you for your time.

Seq[String] works perfectly fine. I also tried running a for loop through all 
elements to see if any access to a value was broken, but no, they are alright.

For now, I solved it properly calling this. Sadly, it takes a lot of time, but 
works:

var data_sas = 
sqlContext.read.format(com.github.saurfang.sas.spark).load(/path/to/file.s)
data_sas.cache
for (col - clean_cols) {
data_sas = data_sas.drop(col)
}
data_sas.unpersist

Saif


From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Thursday, July 16, 2015 12:58 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Select all columns except some

Have you tried to examine what clean_cols contains -- I'm suspect of this part 
mkString(“, “).
Try this:
val clean_cols : Seq[String] = df.columns...

if you get a type error you need to work on clean_cols (I suspect yours is of 
type String at the moment and presents itself to Spark as a single column names 
with commas embedded).

Not sure why the .drop call hangs but in either case drop returns a new 
dataframe -- it's not a setter call

On Thu, Jul 16, 2015 at 10:57 AM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi,

In a hundred columns dataframe, I wish to either select all of them except or 
drop the ones I dont want.

I am failing in doing such simple task, tried two ways

val clean_cols = df.columns.filterNot(col_name = 
col_name.startWith(“STATE_”).mkString(“, “)
df.select(clean_cols)

But this throws exception:
org.apache.spark.sql.AnalysisException: cannot resolve 'asd_dt, 
industry_area,...’
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.orghttp://org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)

The other thing I tried is

df.columns.filter(col_name = col_name.startWith(“STATE_”)
for (col - cols) df.drop(col)

But this other thing doesn’t do anything or hangs up.

Saif






Select all columns except some

2015-07-16 Thread Saif.A.Ellafi
Hi,

In a hundred columns dataframe, I wish to either select all of them except or 
drop the ones I dont want.

I am failing in doing such simple task, tried two ways

val clean_cols = df.columns.filterNot(col_name = 
col_name.startWith(STATE_).mkString(, )
df.select(clean_cols)

But this throws exception:
org.apache.spark.sql.AnalysisException: cannot resolve 'asd_dt, 
industry_area,...'
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)

The other thing I tried is

df.columns.filter(col_name = col_name.startWith(STATE_)
for (col - cols) df.drop(col)

But this other thing doesn't do anything or hangs up.

Saif





RE: java.io.InvalidClassException

2015-07-13 Thread Saif.A.Ellafi
Thank you, extending Serializable solved the issue. I am left with more 
questions than answers though :-).

Regards,
Saif

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Monday, July 13, 2015 2:49 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org; Liu, Weicheng
Subject: Re: java.io.InvalidClassException

I would certainly try to mark the Validator class as Serializable...If that 
doesn't do it you can also try and see if this flag sheds more light:  
-Dsun.io.serialization.extendedDebugInfo=true

 By programming guide I mean this: 
https://spark.apache.org/docs/latest/programming-guide.html I could almost 
swear I had seen an extended section on tricky serialization issues (i.e. 
scenarios where you end up serializing more than you think because of what your 
closure captures) but I can't locate this section now...

On Mon, Jul 13, 2015 at 1:30 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Thank you very much for your time, here is how I designed the case classes, as 
far as I know they apply properly.

Ps: By the way, what do you mean by “The programming guide?”

abstract class Validator {

// positions to access with Row.getInt(x)
val shortsale_in_pos = 10
val month_pos = 11
val foreclosure_start_dt_pos = 14
val filemonth_dt_pos = 12
val reo_start_dt_pos = 14
// ..

// redesign into Iterable of Rows --
def validate(input: org.apache.spark.sql.Row): Validator

}

case object Nomatch extends Validator {
def validate(input: Row): Validator = this
  }

case object Shortsale extends Validator {
def validate(input: Row): Validator = {
var check1: Boolean = if (input.getDouble(shortsale_in_pos)  140.0) 
true else false
if (check1) this else Nomatch
}
}

Saif

From: Yana Kadiyska 
[mailto:yana.kadiy...@gmail.commailto:yana.kadiy...@gmail.com]
Sent: Monday, July 13, 2015 2:16 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: java.io.InvalidClassException

It's a bit hard to tell from the snippets of code but it's likely related to 
the fact that when you serialize instances the enclosing class, if any, also 
gets serialized, as well as any other place where fields used in the closure 
come from...e.g.check this discussion: 
http://stackoverflow.com/questions/9747443/java-io-invalidclassexception-no-valid-constructor

The programming guide also has good advice on serialization issues. I would 
particulary check how Shortsale/Nomatch/Foreclosure are declared (I'd advise 
making them top-level case classes)...

On Mon, Jul 13, 2015 at 12:32 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi,

For some experiment I am doing, I am trying to do the following.

1.Created an abstract class Validator. Created case objects from Validator with 
validate(row: Row): Boolean method.

2. Adding in a list all case objects

3. Each validate takes a Row into account, returns “itself” if validate returns 
true, so then, I do this to return an arbitrary number for each match
def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {

var result: Int = -1

for (validator - validators) {
validator.validate(row) match {
case Shortsale =  result = 0
case Foreclosure = result = 1
case Nomatch = result 99
//...
}
}
result
}

val validators = List[ClientPath](
Shortsale,
Foreclosure)

4. Then I run the map[Int](row = evaluate_paths(row, validators)

But this blows up, it does not like the creation of the list of validators when 
executing an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also 
replaced the for loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly, 
when calling evaluate_paths(some_row, validators).

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 
(TID 830, localhost): java.io.InvalidClassException: 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
 no valid constructor at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
 at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 

RE: java.io.InvalidClassException

2015-07-13 Thread Saif.A.Ellafi
Thank you very much for your time, here is how I designed the case classes, as 
far as I know they apply properly.

Ps: By the way, what do you mean by “The programming guide?”

abstract class Validator {

// positions to access with Row.getInt(x)
val shortsale_in_pos = 10
val month_pos = 11
val foreclosure_start_dt_pos = 14
val filemonth_dt_pos = 12
val reo_start_dt_pos = 14
// ..

// redesign into Iterable of Rows --
def validate(input: org.apache.spark.sql.Row): Validator

}

case object Nomatch extends Validator {
def validate(input: Row): Validator = this
  }

case object Shortsale extends Validator {
def validate(input: Row): Validator = {
var check1: Boolean = if (input.getDouble(shortsale_in_pos)  140.0) 
true else false
if (check1) this else Nomatch
}
}

Saif

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Monday, July 13, 2015 2:16 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: java.io.InvalidClassException

It's a bit hard to tell from the snippets of code but it's likely related to 
the fact that when you serialize instances the enclosing class, if any, also 
gets serialized, as well as any other place where fields used in the closure 
come from...e.g.check this discussion: 
http://stackoverflow.com/questions/9747443/java-io-invalidclassexception-no-valid-constructor

The programming guide also has good advice on serialization issues. I would 
particulary check how Shortsale/Nomatch/Foreclosure are declared (I'd advise 
making them top-level case classes)...

On Mon, Jul 13, 2015 at 12:32 PM, 
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote:
Hi,

For some experiment I am doing, I am trying to do the following.

1.Created an abstract class Validator. Created case objects from Validator with 
validate(row: Row): Boolean method.

2. Adding in a list all case objects

3. Each validate takes a Row into account, returns “itself” if validate returns 
true, so then, I do this to return an arbitrary number for each match
def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {

var result: Int = -1

for (validator - validators) {
validator.validate(row) match {
case Shortsale =  result = 0
case Foreclosure = result = 1
case Nomatch = result 99
//...
}
}
result
}

val validators = List[ClientPath](
Shortsale,
Foreclosure)

4. Then I run the map[Int](row = evaluate_paths(row, validators)

But this blows up, it does not like the creation of the list of validators when 
executing an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also 
replaced the for loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly, 
when calling evaluate_paths(some_row, validators).

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 
(TID 830, localhost): java.io.InvalidClassException: 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
 no valid constructor at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
 at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
...
...
...
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at 
org.apache.spark.scheduler.Task.run(Task.scala:70) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at 

java.io.InvalidClassException

2015-07-13 Thread Saif.A.Ellafi
Hi,

For some experiment I am doing, I am trying to do the following.

1.Created an abstract class Validator. Created case objects from Validator with 
validate(row: Row): Boolean method.

2. Adding in a list all case objects

3. Each validate takes a Row into account, returns itself if validate returns 
true, so then, I do this to return an arbitrary number for each match

def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {

var result: Int = -1

for (validator - validators) {
validator.validate(row) match {
case Shortsale =  result = 0
case Foreclosure = result = 1
case Nomatch = result 99
//...
}
}
result
}

val validators = List[ClientPath](
Shortsale,
Foreclosure)

4. Then I run the map[Int](row = evaluate_paths(row, validators)

But this blows up, it does not like the creation of the list of validators when 
executing an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also 
replaced the for loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly, 
when calling evaluate_paths(some_row, validators).

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 
(TID 830, localhost): java.io.InvalidClassException: 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
 no valid constructor at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
 at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
...
...
...
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at 
org.apache.spark.scheduler.Task.run(Task.scala:70) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at scala.Option.foreach(Option.scala:236) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

 --

Any advice grateful
Saif



MLLIB RDD segmentation for logistic regression

2015-07-13 Thread Saif.A.Ellafi
Hello all,

I have one big RDD, in which there is a column of groups A1, A2, B1, B2, B3, 
C1, D1, ..., XY.
Out of it, I am using map() to transform into RDD[LabeledPoint] with dense 
vectors for later use into Logistic Regression, which takes RDD[LabeledPoint]
I would like to run a logistic regression for each one of this N groups (which 
is NOT part of any features used in the model itself), but I could not find a 
proper way.

1.  Can't programatically create sub RDDs with a loop: 
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations;

2.  Can't create RDDs manually with split() since unknown and large number 
of groups

3.  Pair RDDs seemed a tempting choice with some reduce/combine/values 
bykey functions, but non of them return a data-type valuable as a 
RDD[LabeledPoint] which is lately an input for Logistic Regressions. Any 
programatical way to get sub-RDDs get me back to item 1.

The logit is a simple binary dependant variable out of n features, I just need 
to run one logit for each group.
There may be some mathematical equivalent to run this in one big regression, 
but so far, im out of ideas.

Saif



How to deal with null values on LabeledPoint

2015-07-07 Thread Saif.A.Ellafi
Hello,

reading from spark-csv, got some lines with missing data (not invalid).

applying map() to create a LabeledPoint with denseVector. Using map( Row = 
Row.getDouble(col_index) )

To this point:
res173: org.apache.spark.mllib.regression.LabeledPoint = 
(-1.530132691E9,[162.89431,13.55811,18.3346818,-1.6653182])

As running the following code:

  val model = new LogisticRegressionWithLBFGS().
  setNumClasses(2).
  setValidateData(true).
  run(data_map)

  java.lang.RuntimeException: Failed to check null bit for primitive double 
value.

Debugging this, I am pretty sure this is because rows that look like 
-2.593849123898,392.293891



Spark-csv into labeled points with null values

2015-07-03 Thread Saif.A.Ellafi
Hello all,

I am learning scala spark and going through some applications with data I have. 
Please allow me to put a couple questions:

spark-csv: The data I have, ain't malformed, but there are empty values in some 
rows, properly comma-sepparated and not catched by DROPMALFORMED mode
These values are taken into account as null values. My final mission is to 
create a LabeledPoint vector for MLLIB, so my steps are:
a.  load csv
b.  cast column types to have a proper DataFrame schema
c.  apply map() to create a LabeledPoint with denseVector. Using map( Row 
= Row.getDouble(col_index) )

To this point:
res173: org.apache.spark.mllib.regression.LabeledPoint = 
(-1.530132691E9,[162.89431,13.55811,18.3346818,-1.6653182])

As running the following code:

  val model = new LogisticRegressionWithLBFGS().
  setNumClasses(2).
  setValidateData(true).
  run(data_map)

  java.lang.RuntimeException: Failed to check null bit for primitive double 
value.

Debugging this, I am pretty sure this is because rows that look like 
-2.593849123898,392.293891

Any suggestions to get round this?

Saif