Re: Possible long lineage issue when using DStream to update a normal RDD

2015-05-07 Thread Chunnan Yao
Thank you for this suggestion! But may I ask what's the advantage to use
checkpoint instead of cache here? Cuz they both cut lineage. I only know
checkpoint saves RDD in disk, while cache in memory. So may be it's for
reliability?

Also on http://spark.apache.org/docs/latest/streaming-programming-guide.html,
I have not seen usage of "foreachRDD" like mine. Here I am not pushing data
to external system. I just use it to update an RDD in Spark. Is this right?



2015-05-08 14:03 GMT+08:00 Shao, Saisai :

> I think you could use checkpoint to cut the lineage of `MyRDD`, I have a
> similar scenario and I use checkpoint to workaround this problem :)
>
> Thanks
> Jerry
>
> -Original Message-
> From: yaochunnan [mailto:yaochun...@gmail.com]
> Sent: Friday, May 8, 2015 1:57 PM
> To: user@spark.apache.org
> Subject: Possible long lineage issue when using DStream to update a normal
> RDD
>
> Hi all,
> Recently in our project, we need to update a RDD using data regularly
> received from DStream, I plan to use "foreachRDD" API to achieve this:
> var MyRDD = ...
> dstream.foreachRDD { rdd =>
>   MyRDD = MyRDD.join(rdd)...
>   ...
> }
>
> Is this usage correct? My concern is, as I am repeatedly and endlessly
> reassigning MyRDD in order to update it, will it create a too long RDD
> lineage to process when I want to query MyRDD later on (similar as
> https://issues.apache.org/jira/browse/SPARK-4672) ?
>
> Maybe I should:
> 1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a
> dstream comes in.
> 2. use the unpublished IndexedRDD
> (https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD
> update.
>
> As I lack experience using Spark Streaming and indexedRDD, I am here to
> make sure my thoughts are on the right track. Your wise suggestions will be
> greatly appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>


Re: Stop Cluster Mode Running App

2015-05-07 Thread James King
Many Thanks Silvio,

Someone also suggested using something similar :

./bin/spark-class org.apache.spark.deploy.Client kill  

Regards
jk


On Fri, May 8, 2015 at 2:12 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>   Hi James,
>
>  If you’re on Spark 1.3 you can use the kill command in spark-submit to
> shut it down. You’ll need the driver id from the Spark UI or from when you
> submitted the app.
>
>  spark-submit --master spark://master:7077 --kill 
>
>  Thanks,
> Silvio
>
>   From: James King
> Date: Wednesday, May 6, 2015 at 12:02 PM
> To: user
> Subject: Stop Cluster Mode Running App
>
>   I submitted a Spark Application in cluster mode and now every time I
> stop the cluster and restart it the job resumes execution.
>
>  I even killed a daemon called DriverWrapper it stops the app but it
> resumes again.
>
>  How can stop this application from running?
>


Re: Dismatch when use sparkSQL insert data into a hive table with dynamic partition datetype

2015-05-07 Thread Gerald-G
Resolved by create udf to match different format of DateType in sparksql
and HIVE

suggest fix it in next release

On Fri, May 8, 2015 at 11:07 AM, Gerald-G  wrote:

> Hi:
>
>  Spark version is 1.3.1
>
>  I used sparksql insert data into a hive table with datetype partition
>
>  The column is translate to an int num which is the date diff from
> 1970-01-01
>
> fat_tbUser is a very simple hive table with partition dtstatdate DateType
>   And date column userid
>
>   INSERT INTO TABLE fat_tbUser  "
> + "partition(dtstatdate) "
> + "SELECT userid"
> + "'2015-05-08' AS dtstatdate,"
> + "FROM fat_tbUser WHERE
> dtStatDate=date_add('2015-05-08',-2)"
>
>
> When I run it in sparkSQL, HIVE get the   dtstatdate=16563
>
>
> Any Suggestion?
>
> Thanks
> Yours
>
> Meng
>
>


RE: Possible long lineage issue when using DStream to update a normal RDD

2015-05-07 Thread Shao, Saisai
I think you could use checkpoint to cut the lineage of `MyRDD`, I have a 
similar scenario and I use checkpoint to workaround this problem :)

Thanks
Jerry

-Original Message-
From: yaochunnan [mailto:yaochun...@gmail.com] 
Sent: Friday, May 8, 2015 1:57 PM
To: user@spark.apache.org
Subject: Possible long lineage issue when using DStream to update a normal RDD

Hi all,
Recently in our project, we need to update a RDD using data regularly received 
from DStream, I plan to use "foreachRDD" API to achieve this:
var MyRDD = ...
dstream.foreachRDD { rdd =>
  MyRDD = MyRDD.join(rdd)...
  ...
}

Is this usage correct? My concern is, as I am repeatedly and endlessly 
reassigning MyRDD in order to update it, will it create a too long RDD lineage 
to process when I want to query MyRDD later on (similar as
https://issues.apache.org/jira/browse/SPARK-4672) ? 

Maybe I should:
1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a 
dstream comes in.
2. use the unpublished IndexedRDD
(https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD update.

As I lack experience using Spark Streaming and indexedRDD, I am here to make 
sure my thoughts are on the right track. Your wise suggestions will be greatly 
appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Possible long lineage issue when using DStream to update a normal RDD

2015-05-07 Thread yaochunnan
Hi all, 
Recently in our project, we need to update a RDD using data regularly
received from DStream, I plan to use "foreachRDD" API to achieve this:
var MyRDD = ...
dstream.foreachRDD { rdd => 
  MyRDD = MyRDD.join(rdd)...
  ...
}

Is this usage correct? My concern is, as I am repeatedly and endlessly
reassigning MyRDD in order to update it, will it create a too long RDD
lineage to process when I want to query MyRDD later on (similar as
https://issues.apache.org/jira/browse/SPARK-4672) ? 

Maybe I should:
1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a
dstream comes in.
2. use the unpublished IndexedRDD
(https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD
update.

As I lack experience using Spark Streaming and indexedRDD, I am here to make
sure my thoughts are on the right track. Your wise suggestions will be
greatly appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



(无主题)

2015-05-07 Thread luohui20001
Hi guys, I got a "PhoenixParserException: ERROR 601 
(42P00): Syntax error. Encountered "FORMAT" at line 21, column 141." 
when creating a table by using "ROW FORMAT DELIMITED FIELDS TERMINATED 
BY '\t' LINES TERMINATED BY '\n'". As I remember, previous 
version phoenix support this grammar, doesn't it? Actually I wanna load a
 csv file of 1.7million record into an empty table ,however after i 
finished loading this file, and count the table, I got a result of 24 
lines... how to make the table fetch a multiply line file correctly?  
thanks.



 

Thanks&Best regards!
San.Luo


Re: Spark does not delete temporary directories

2015-05-07 Thread Sean Owen
You're referring to a comment in the generic utility method, not the
specific calls to it. The comment just says that the generic method
doesn't mark the directory for deletion. Individual uses of it might
need to.

One or more of these might be delete-able on exit, but in any event
it's just a directory. I think 'spark files' might intentionally stay
around since it outlives one JVM and might be shared across executors.

On Fri, May 8, 2015 at 3:53 AM, Taeyun Kim  wrote:
> It seems that they are always empty.
>
>
>
> I've traced the spark source code.
>
> The module methods that create the 3 'temp' directories are as follows:
>
>
>
> - DiskBlockManager.createLocalDirs
>
> - HttpFileServer.initialize
>
> - SparkEnv.sparkFilesDir
>
>
>
> They (eventually) call Utils.getOrCreateLocalRootDirs and then
> Utils.createDirectory, which intentionally does NOT mark the directory for
> automatic deletion.
>
> The comment of createDirectory method says: "The directory is guaranteed to
> be newly created, and is not marked for automatic deletion."
>
> I don't know why they are not marked. Is this really intentional?
>
>
>
> From: Haopu Wang [mailto:hw...@qilinsoft.com]
> Sent: Friday, May 08, 2015 11:37 AM
> To: Taeyun Kim; Ted Yu; Todd Nist; user@spark.apache.org
>
>
> Subject: RE: Spark does not delete temporary directories
>
>
>
> I think the temporary folders are used to store blocks and shuffles. That
> doesn't depend on the cluster manager.
>
> Ideally they should be removed after the application has been terminated.
>
> Can you check if there are contents under those folders?
>
>
>
> 
>
> From: Taeyun Kim [mailto:taeyun@innowireless.com]
> Sent: Friday, May 08, 2015 9:42 AM
> To: 'Ted Yu'; 'Todd Nist'; user@spark.apache.org
> Subject: RE: Spark does not delete temporary directories
>
>
>
> Thanks, but it seems that the option is for Spark standalone mode only.
>
> I’ve (lightly) tested the options with local mode and yarn-client mode, the
> ‘temp’ directories were not deleted.
>
>
>
> From: Ted Yu [mailto:yuzhih...@gmail.com]
> Sent: Thursday, May 07, 2015 10:47 PM
> To: Todd Nist
> Cc: Taeyun Kim; user@spark.apache.org
> Subject: Re: Spark does not delete temporary directories
>
>
>
> Default value for spark.worker.cleanup.enabled is false:
>
>
> private val CLEANUP_ENABLED =
> conf.getBoolean("spark.worker.cleanup.enabled", false)
>
>
>
> I wonder if the default should be set as true.
>
>
>
> Cheers
>
>
>
> On Thu, May 7, 2015 at 6:19 AM, Todd Nist  wrote:
>
> Have you tried to set the following?
>
> spark.worker.cleanup.enabled=true
> spark.worker.cleanup.appDataTtl=”
>
>
>
>
>
> On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim 
> wrote:
>
> Hi,
>
>
>
> After a spark program completes, there are 3 temporary directories remain in
> the temp directory.
>
> The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7
>
>
>
> And the Spark program runs on Windows, a snappy DLL file also remains in the
> temp directory.
>
> The file name is like this:
> snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava
>
>
>
> They are created every time the Spark program runs. So the number of files
> and directories keeps growing.
>
>
>
> How can let them be deleted?
>
>
>
> Spark version is 1.3.1 with Hadoop 2.6.
>
>
>
> Thanks.
>
>
>
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Master node memory usage question

2015-05-07 Thread Richard Alex Hofer

Hi,
I'm working on a project in Spark and am trying to understand what's 
going on. Right now to try and understand what's happening we came up 
with this snippet of code which very roughly resembles what we're 
actually doing. When trying to run this our master node ends up quickly 
using up its memory even though all of our RDDs are very small. Can 
someone explain what's going on here and how we can avoid it?


a = sc.parallelize(xrange(100),10)
b = a

for i in xrange(10):
a = a.map(lambda x: x + 1)
if i % 300 == 0:
# We do this to try and force some of our RDD to evaluate
a.persist()
a.foreachPartition(lambda _: None)
b.unpersist()
b = a
a.collect()
b.unpersist()

-Richard Hofer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: YARN mode startup takes too long (10+ secs)

2015-05-07 Thread Taeyun Kim
I think I’ve found the (maybe partial, but major) reason.

 

It’s between the following lines, (it’s newly captured, but essentially the 
same place that Zoltán Zvara picked:

 

15/05/08 11:36:32 INFO BlockManagerMaster: Registered BlockManager

15/05/08 11:36:38 INFO YarnClientSchedulerBackend: Registered executor: 
Actor[akka.tcp://sparkExecutor@cluster04:55237/user/Executor#-149550753] with 
ID 1

 

When I read the logs on cluster side, the following lines were found: (the 
exact time is different with above line, but it’s the difference between 
machines)

 

15/05/08 11:36:23 INFO yarn.ApplicationMaster: Started progress reporter thread 
- sleep time : 5000

15/05/08 11:36:28 INFO impl.AMRMClientImpl: Received new token for : 
cluster04:45454

 

It seemed that Spark deliberately sleeps 5 secs.

I’ve read the Spark source code, and in 
org.apache.spark.deploy.yarn.ApplicationMaster.scala, launchReporterThread() 
had the code for that.

It loops calling allocator.allocateResources() and Thread.sleep(). 

For sleep, it reads the configuration variable 
spark.yarn.scheduler.heartbeat.interval-ms (the default value is 5000, which is 
5 secs).

According to the comment, “we want to be reasonably responsive without causing 
too many requests to RM”.

So, unless YARN immediately fulfill the allocation request, it seems that 5 
secs will be wasted.

 

When I modified the configuration variable to 1000, it only waited for 1 sec.

 

Here is the log lines after the change:

 

15/05/08 11:47:21 INFO yarn.ApplicationMaster: Started progress reporter thread 
- sleep time : 1000

15/05/08 11:47:22 INFO impl.AMRMClientImpl: Received new token for : 
cluster04:45454

 

4 secs saved.

So, when one does not want to wait 5 secs, one can change the 
spark.yarn.scheduler.heartbeat.interval-ms.

I hope that the additional overhead it incurs would be negligible.  

 

 

From: Zoltán Zvara [mailto:zoltan.zv...@gmail.com] 
Sent: Thursday, May 07, 2015 10:05 PM
To: Taeyun Kim; user@spark.apache.org
Subject: Re: YARN mode startup takes too long (10+ secs)

 

Without considering everything, just a few hints:

You are running on YARN. From 09:18:34 to 09:18:37 your application is in state 
ACCEPTED. There is a noticeable overhead introduced due to communicating with 
YARN's ResourceManager, NodeManager and given that the YARN scheduler needs 
time to make a decision. I guess somewhere from 09:18:38 to 09:18:43 your 
application JAR gets copied to another container requested by the Spark 
ApplicationMaster deployed on YARN's container 0. Deploying an executor needs 
further resource negotiations with the ResourceManager usually. Also, as I 
said, your JAR and Executor's code requires copying to the container's local 
directory - execution blocked until that is complete.

 

On Thu, May 7, 2015 at 3:09 AM Taeyun Kim  wrote:

Hi,

 

I’m running a spark application with YARN-client or YARN-cluster mode.

But it seems to take too long to startup.

It takes 10+ seconds to initialize the spark context.

Is this normal? Or can it be optimized?

 

The environment is as follows:

- Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)

- Spark: 1.3.1

- Client: Windows 7, but similar result on CentOS 6.6

 

The following is the startup part of the application log. (Some private 
information was edited)

‘Main: Initializing context’ at the first line and ‘MainProcessor: Deleting 
previous output files’ at the last line are the logs by the application. Others 
in between are from Spark itself. Application logic is executed after this log 
is displayed.

 

---

 

15/05/07 09:18:31 INFO Main: Initializing context

15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1

15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp

15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to: myuser,myapp

15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(myuser, myapp); 
users with modify permissions: Set(myuser, myapp)

15/05/07 09:18:31 INFO Slf4jLogger: Slf4jLogger started

15/05/07 09:18:31 INFO Remoting: Starting remoting

15/05/07 09:18:31 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@mymachine:54449]

15/05/07 09:18:31 INFO Utils: Successfully started service 'sparkDriver' on 
port 54449.

15/05/07 09:18:31 INFO SparkEnv: Registering MapOutputTracker

15/05/07 09:18:32 INFO SparkEnv: Registering BlockManagerMaster

15/05/07 09:18:32 INFO DiskBlockManager: Created local directory at 
C:\Users\myuser\AppData\Local\Temp\spark-2d3db9d6-ea78-438e-956f-be9c1dcf3a9d\blockmgr-e9ade223-a4b8-4d9f-b038-efd66adf9772

15/05/07 09:18:32 INFO MemoryStore: MemoryStore started with capacity 1956.7 MB

15/05/07 09:18:32 INFO HttpFileServer: HTTP File server directory is 
C:\Users\myuser\AppData\Local\Temp\spark-ff40d73b-e8ab-433e-88c4-35da27fb6278\httpd-def9220f-ac3a-4dd2-9ac1-2c593b94b2d9

15/05/07 09:18:32 I

Discretization

2015-05-07 Thread spark_user_2015
The Spark documentation shows the following example code:

// Discretize data in 16 equal bins since ChiSqSelector requires categorical
features
val discretizedData = data.map { lp =>
  LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => x / 16
} ) )
}

I'm sort of missing why "x / 16" is considered a discretization approach
here. 

[https://spark.apache.org/docs/latest/mllib-feature-extraction.html#feature-selection]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Discretization-tp22811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL issue: Spark 1.3.1 + hadoop 2.6 on CDH5.3 with parquet

2015-05-07 Thread felicia
Hi all,

Thanks for the help on this case!
we finally settle this by adding a jar named: parquet-hive-bundle-1.5.0.jar
when submitting jobs through spark-submit,
where this jar file does not exist in our CDH5.3 anyway (we've downloaded it
from http://mvnrepository.com/artifact/com.twitter/parquet-hive/1.5.0)

hope this helps!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-issue-Spark-1-3-1-hadoop-2-6-on-CDH5-3-with-parquet-tp22808p22810.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-07 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:
  akka {
extensions = ["kamon.system.SystemMetrics", "kamon.statsd.StatsD"]
  }

But i can not find a way to do this, i have tried these:
1. SparkConf.set("akka.extensions", """["kamon.system.SystemMetrics",
"kamon.statsd.StatsD"]""")
2. use application.conf and set it use java option
"-Dconfig.resource=/path/to/conf"
3. Set "akka.extensions ["kamon.system.SystemMetrics",
"kamon.statsd.StatsD"]" in spark conf file

None of these two works.

Do we have others ways to set this?

Thanks!


Re: SparkSQL issue: Spark 1.3.1 + hadoop 2.6 on CDH5.3 with parquet

2015-05-07 Thread Marcelo Vanzin
On Thu, May 7, 2015 at 7:39 PM, felicia  wrote:

> we tried to add /usr/lib/parquet/lib & /usr/lib/parquet to SPARK_CLASSPATH
> and it doesn't seems to work,
>

To add the jars to the classpath you need to use "/usr/lib/parquet/lib/*",
otherwise you're just adding the directory (and not the files within it).

-- 
Marcelo


Dismatch when use sparkSQL insert data into a hive table with dynamic partition datetype

2015-05-07 Thread Gerald-G
Hi:

 Spark version is 1.3.1

 I used sparksql insert data into a hive table with datetype partition

 The column is translate to an int num which is the date diff from
1970-01-01

fat_tbUser is a very simple hive table with partition dtstatdate DateType
And date column userid

  INSERT INTO TABLE fat_tbUser  "
+ "partition(dtstatdate) "
+ "SELECT userid"
+ "'2015-05-08' AS dtstatdate,"
+ "FROM fat_tbUser WHERE
dtStatDate=date_add('2015-05-08',-2)"


When I run it in sparkSQL, HIVE get the   dtstatdate=16563


Any Suggestion?

Thanks
Yours

Meng


Re: Spark 1.3 createDataframe error with pandas df

2015-05-07 Thread kevindahl
Seems I am still having the same issue in different scenarios. Using the
'dylanmei/docker-zeppelin' container I get the same issue as before when
trying to create a spark dataframe from a pandas dataframe.

code:

%pyspark
import pandas as pd

names = ['Bob','Jessica','Mary','John','Mel']
births = [968, 155, 77, 578, 973]
BabyDataSet = zip(names,births)
df = pd.DataFrame(data = BabyDataSet, columns=['Names', 'Births'])
rdf = sqlc.createDataFrame(df)

result:

(, Py4JJavaError(u'An error occurred
while calling z:org.apache.spark.api.python.PythonRDD.runJob.\n', JavaObject
id=o49), )



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-createDataframe-error-with-pandas-df-tp22053p22809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark does not delete temporary directories

2015-05-07 Thread Taeyun Kim
It seems that they are always empty.

 

I've traced the spark source code.

The module methods that create the 3 'temp' directories are as follows:

 

- DiskBlockManager.createLocalDirs

- HttpFileServer.initialize

- SparkEnv.sparkFilesDir

 

They (eventually) call Utils.getOrCreateLocalRootDirs and then
Utils.createDirectory, which intentionally does NOT mark the directory for
automatic deletion.

The comment of createDirectory method says: "The directory is guaranteed to
be newly created, and is not marked for automatic deletion."

I don't know why they are not marked. Is this really intentional?

 

From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Friday, May 08, 2015 11:37 AM
To: Taeyun Kim; Ted Yu; Todd Nist; user@spark.apache.org
Subject: RE: Spark does not delete temporary directories

 

I think the temporary folders are used to store blocks and shuffles. That
doesn't depend on the cluster manager.

Ideally they should be removed after the application has been terminated.

Can you check if there are contents under those folders?

 

  _  

From: Taeyun Kim [mailto:taeyun@innowireless.com] 
Sent: Friday, May 08, 2015 9:42 AM
To: 'Ted Yu'; 'Todd Nist'; user@spark.apache.org
Subject: RE: Spark does not delete temporary directories

 

Thanks, but it seems that the option is for Spark standalone mode only.

I've (lightly) tested the options with local mode and yarn-client mode, the
'temp' directories were not deleted.

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Thursday, May 07, 2015 10:47 PM
To: Todd Nist
Cc: Taeyun Kim; user@spark.apache.org
Subject: Re: Spark does not delete temporary directories

 

Default value for spark.worker.cleanup.enabled is false:


private val CLEANUP_ENABLED =
conf.getBoolean("spark.worker.cleanup.enabled", false)

 

I wonder if the default should be set as true.

 

Cheers

 

On Thu, May 7, 2015 at 6:19 AM, Todd Nist  wrote:

Have you tried to set the following?

spark.worker.cleanup.enabled=true 
spark.worker.cleanup.appDataTtl="

 

 

On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim 
wrote:

Hi,

 

After a spark program completes, there are 3 temporary directories remain in
the temp directory.

The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7

 

And the Spark program runs on Windows, a snappy DLL file also remains in the
temp directory.

The file name is like this:
snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava

 

They are created every time the Spark program runs. So the number of files
and directories keeps growing.

 

How can let them be deleted?

 

Spark version is 1.3.1 with Hadoop 2.6.

 

Thanks.

 

 

 

 



SparkSQL issue: Spark 1.3.1 + hadoop 2.6 on CDH5.3 with parquet

2015-05-07 Thread felicia
Hi all,
I'm able to run SparkSQL through python/java and retrieve data from ordinary
table,
but when trying to fetch data from parquet table, following error shows up:\
which is pretty straight-forward indicating that parquet-related class was
not found;
we tried to add /usr/lib/parquet/lib & /usr/lib/parquet to SPARK_CLASSPATH
and it doesn't seems to work,
what is the proper way to solve this issue? Do we have to re-compile Spark
with Parquet-related jars?

Thanks!

ERROR hive.log: error in initSerDe: java.lang.ClassNotFoundException Class
parquet.hive.serde.ParquetHiveSerDe not found
java.lang.ClassNotFoundException: Class parquet.hive.serde.ParquetHiveSerDe
not found
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:337)
at
org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:288)
at 
org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:281)
at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:631)
at 
org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:189)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1017)
at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:78)
at
org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:137)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:137)
at
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143)
at
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
at
org.apache.sp

RE: Spark does not delete temporary directories

2015-05-07 Thread Haopu Wang
I think the temporary folders are used to store blocks and shuffles.
That doesn't depend on the cluster manager.

Ideally they should be removed after the application has been
terminated.

Can you check if there are contents under those folders?

 



From: Taeyun Kim [mailto:taeyun@innowireless.com] 
Sent: Friday, May 08, 2015 9:42 AM
To: 'Ted Yu'; 'Todd Nist'; user@spark.apache.org
Subject: RE: Spark does not delete temporary directories

 

Thanks, but it seems that the option is for Spark standalone mode only.

I've (lightly) tested the options with local mode and yarn-client mode,
the 'temp' directories were not deleted.

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Thursday, May 07, 2015 10:47 PM
To: Todd Nist
Cc: Taeyun Kim; user@spark.apache.org
Subject: Re: Spark does not delete temporary directories

 

Default value for spark.worker.cleanup.enabled is false:


private val CLEANUP_ENABLED =
conf.getBoolean("spark.worker.cleanup.enabled", false)

 

I wonder if the default should be set as true.

 

Cheers

 

On Thu, May 7, 2015 at 6:19 AM, Todd Nist  wrote:

Have you tried to set the following?

spark.worker.cleanup.enabled=true 
spark.worker.cleanup.appDataTtl="

 

 

On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim 
wrote:

Hi,

 

After a spark program completes, there are 3 temporary directories
remain in the temp directory.

The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7

 

And the Spark program runs on Windows, a snappy DLL file also remains in
the temp directory.

The file name is like this:
snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava

 

They are created every time the Spark program runs. So the number of
files and directories keeps growing.

 

How can let them be deleted?

 

Spark version is 1.3.1 with Hadoop 2.6.

 

Thanks.

 

 

 

 



Re: Duplicate entries in output of mllib column similarities

2015-05-07 Thread Reza Zadeh
This shouldn't be happening, do you have an example to reproduce it?

On Thu, May 7, 2015 at 4:17 PM, rbolkey  wrote:

> Hi,
>
> I have a question regarding one of the oddities we encountered while
> running
> mllib's column similarities operation. When we examine the output, we find
> duplicate matrix entries (the same i,j). Sometimes the entries have the
> same
> value/similarity score, but they're frequently different too.
>
> Is this a known issue? An artifact of the probabilistic nature of the
> output? Which output score should we trust (lower vs higher one when
> different)? We're using a threshold of 0.3, and running Spark 1.3.1 on a 10
> node cluster.
>
> Thanks
> Rick
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-entries-in-output-of-mllib-column-similarities-tp22807.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


回复:回复:回复:RE: 回复:回复:RE: 回复:Re:_sparksql_running_slow_while_joining_2_tables.

2015-05-07 Thread luohui20001
I tried small spark.sql.shuffle.partitions = 16,so that every task will fetch 
generally equal size of data,however every task runs still slow.




 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:
收件人:"luohui20001" , "java8964" , 
"user" 
主题:回复:回复:RE: 回复:回复:RE: 回复:Re:_sparksql_running_slow_while_joining_2_tables.
日期:2015年05月08日 09点01分

hi yong,  I checked the tasks again,found some running with small data 
while a few tasks running with more data. Is that "data screw" that your just 
mentioned? 




 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:
收件人:"java8964" , "user" 
主题:回复:RE: 回复:回复:RE: 回复:Re:_sparksql_running_slow_while_joining_2_tables.
日期:2015年05月08日 08点14分

I checked the data again, no skewed data in it, it is just txt files with 
sereral string and int fields. that's it. I also followed the suggestions in 
tuning guild page, refer to 
http://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning
I will keep on inspecting why those left tasks stops running or running 
slow.thanks.




 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:java8964 
收件人:"luohui20...@sina.com" , user 
主题:RE: 回复:回复:RE: 回复:Re:_sparksql_running_slow_while_joining_2_tables.
日期:2015年05月07日 08点27分



It looks like you have data in these 24 partitions, or more. How many unique 
name in your data set?
Enlarge the shuffle partitions only make sense if you have large partition 
groups in your data. What you described looked like either your dataset having 
data in these 24 partitions, or you have skew data in these 24 partitions.
If you really join a 56M data with 26M data, I am surprised that you will have 
24 partitions running very slow, under 8G executor.
Yong
Date: Wed, 6 May 2015 14:04:11 +0800
From: luohui20...@sina.com
To: luohui20...@sina.com; hao.ch...@intel.com; daoyuan.w...@intel.com; 
ssab...@gmail.com; user@spark.apache.org
Subject: 回复:回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

update status after i did some tests. I modified some other parameters, found 2 
parameters maybe relative.
spark_worker_instance and spark.sql.shuffle.partitions


before Today I used default setting of spark_worker_instance and 
spark.sql.shuffle.partitions whose value is 1 and 200.At that time , my app 
stops running at 5/200tasks.


then I changed spark_worker_instance to 2, then my app process moved on to 
about 116/200 tasks.and then changed spark_worker_instance to 4, then I can get 
a further progress at 176/200.however when i changed to 8 or even more ,like 12 
works, it is still 176/200


Later new founds comes to me while I am trying with different 
spark.sql.shuffle.partitions. If I changed to 50,400,800 partitions, it stops 
at 26/50, 376/400,776/800 tasks. always leaving 24 tasks unable to finish.


Not sure why those happens.Hope this info could be helpful to solve it.




 
Thanks&Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:
收件人:"Cheng, Hao" , "Wang, Daoyuan" 
, "Olivier Girardot" , "user" 
,
主题:回复:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月06日 09点51分

db has 1.7million records while sample has 0.6million. jvm settings i tried 
default settings and also tried to apply 4g by "export _java_opts 4g", app 
still stops running.
BTW, here are some details info about gc and jvm.
- 原始邮件 -
发件人:"Cheng, Hao" 
收件人:"luohui20...@sina.com" , "Wang, Daoyuan" 
, Olivier Girardot , user 

主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 20点50分
56mb / 26mb is very small size, do you observe data skew? More precisely, many 
records with the same chrname / name?  And can you also double check the jvm 
settings
 for the executor process?
 
 
From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Tuesday, May 5, 2015 7:50 PM
To: Cheng, Hao; Wang, Daoyuan; Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining_2_tables.
 
Hi guys,
  attache the pic of physical plan and logs.Thanks.

 
Thanks&Best regards!
罗辉 San.Luo
 
- 
原始邮件 -
发件人:"Cheng, Hao" 
收件人:"Wang, Daoyuan" , "luohui20...@sina.com" 
,
 Olivier Girardot , user 
主题:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 13点18分
 
I assume you’re using the DataFrame API within your application.
 
sql(“SELECT…”).explain(true)
 
From: Wang, Daoyuan
Sent: Tuesday, May 5, 2015 10:16 AM
To: luohui20...@sina.com; Cheng, Hao; Olivier Girardot; user
Subject: RE: 回复:RE:
回复:Re: sparksql running slow while joining_2_tables.
 
You can use
Explain extended select ….
 
From:
luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Tuesday, May 05, 2015 9:52 AM
To: Cheng, Hao; Olivier Girardot; user
Subject: 回复:RE:
回复:Re: sparksql running slow while joining_2_tables.
 
As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.
refer to 
http://spark.apache.org/d

RE: Spark does not delete temporary directories

2015-05-07 Thread Taeyun Kim
Thanks, but it seems that the option is for Spark standalone mode only.

I’ve (lightly) tested the options with local mode and yarn-client mode, the 
‘temp’ directories were not deleted.

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Thursday, May 07, 2015 10:47 PM
To: Todd Nist
Cc: Taeyun Kim; user@spark.apache.org
Subject: Re: Spark does not delete temporary directories

 

Default value for spark.worker.cleanup.enabled is false:


private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", 
false)

 

I wonder if the default should be set as true.

 

Cheers

 

On Thu, May 7, 2015 at 6:19 AM, Todd Nist  wrote:

Have you tried to set the following?

spark.worker.cleanup.enabled=true 
spark.worker.cleanup.appDataTtl=”

 

 

On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim  wrote:

Hi,

 

After a spark program completes, there are 3 temporary directories remain in 
the temp directory.

The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7

 

And the Spark program runs on Windows, a snappy DLL file also remains in the 
temp directory.

The file name is like this: 
snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava

 

They are created every time the Spark program runs. So the number of files and 
directories keeps growing.

 

How can let them be deleted?

 

Spark version is 1.3.1 with Hadoop 2.6.

 

Thanks.

 

 

 

 



Re: Predict.scala using model for clustering In reference

2015-05-07 Thread Joseph Bradley
A KMeansModel was trained in the previous step, and it was saved to
"modelFile" as a Java object file.  This step is loading the model back and
reconstructing the KMeansModel, which can then be used to classify new
tweets into different clusters.
Joseph

On Thu, May 7, 2015 at 12:40 PM, anshu shukla 
wrote:

> Can anyone please explain -
>
> println("Initalizaing the the KMeans model...")
> val model = new 
> KMeansModel(ssc.sparkContext.objectFile[Vector](modelFile.toString).collect())
>
> where modelfile is  *directory to persist the model while training *
>
>
>   REF-
>
>
> https://github.com/databricks/reference-apps/blob/master/twitter_classifier/predict.md
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


回复:RE: 回复:回复:RE: 回复:Re:_sparksql_running_slow_while_joining_2_tables.

2015-05-07 Thread luohui20001
I checked the data again, no skewed data in it, it is just txt files with 
sereral string and int fields. that's it. I also followed the suggestions in 
tuning guild page, refer to 
http://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning
I will keep on inspecting why those left tasks stops running or running 
slow.thanks.




 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:java8964 
收件人:"luohui20...@sina.com" , user 
主题:RE: 回复:回复:RE: 回复:Re:_sparksql_running_slow_while_joining_2_tables.
日期:2015年05月07日 08点27分



It looks like you have data in these 24 partitions, or more. How many unique 
name in your data set?
Enlarge the shuffle partitions only make sense if you have large partition 
groups in your data. What you described looked like either your dataset having 
data in these 24 partitions, or you have skew data in these 24 partitions.
If you really join a 56M data with 26M data, I am surprised that you will have 
24 partitions running very slow, under 8G executor.
Yong
Date: Wed, 6 May 2015 14:04:11 +0800
From: luohui20...@sina.com
To: luohui20...@sina.com; hao.ch...@intel.com; daoyuan.w...@intel.com; 
ssab...@gmail.com; user@spark.apache.org
Subject: 回复:回复:RE: 回复:Re: sparksql running slow while joining_2_tables.

update status after i did some tests. I modified some other parameters, found 2 
parameters maybe relative.
spark_worker_instance and spark.sql.shuffle.partitions


before Today I used default setting of spark_worker_instance and 
spark.sql.shuffle.partitions whose value is 1 and 200.At that time , my app 
stops running at 5/200tasks.


then I changed spark_worker_instance to 2, then my app process moved on to 
about 116/200 tasks.and then changed spark_worker_instance to 4, then I can get 
a further progress at 176/200.however when i changed to 8 or even more ,like 12 
works, it is still 176/200


Later new founds comes to me while I am trying with different 
spark.sql.shuffle.partitions. If I changed to 50,400,800 partitions, it stops 
at 26/50, 376/400,776/800 tasks. always leaving 24 tasks unable to finish.


Not sure why those happens.Hope this info could be helpful to solve it.




 
Thanks&Best regards!
罗辉 San.Luo

- 原始邮件 -
发件人:
收件人:"Cheng, Hao" , "Wang, Daoyuan" 
, "Olivier Girardot" , "user" 
,
主题:回复:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月06日 09点51分

db has 1.7million records while sample has 0.6million. jvm settings i tried 
default settings and also tried to apply 4g by "export _java_opts 4g", app 
still stops running.
BTW, here are some details info about gc and jvm.
- 原始邮件 -
发件人:"Cheng, Hao" 
收件人:"luohui20...@sina.com" , "Wang, Daoyuan" 
, Olivier Girardot , user 

主题:RE: 回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 20点50分
56mb / 26mb is very small size, do you observe data skew? More precisely, many 
records with the same chrname / name?  And can you also double check the jvm 
settings
 for the executor process?
 
 
From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Tuesday, May 5, 2015 7:50 PM
To: Cheng, Hao; Wang, Daoyuan; Olivier Girardot; user
Subject: 回复:Re: sparksql running slow while joining_2_tables.
 
Hi guys,
  attache the pic of physical plan and logs.Thanks.

 
Thanks&Best regards!
罗辉 San.Luo
 
- 
原始邮件 -
发件人:"Cheng, Hao" 
收件人:"Wang, Daoyuan" , "luohui20...@sina.com" 
,
 Olivier Girardot , user 
主题:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 13点18分
 
I assume you’re using the DataFrame API within your application.
 
sql(“SELECT…”).explain(true)
 
From: Wang, Daoyuan
Sent: Tuesday, May 5, 2015 10:16 AM
To: luohui20...@sina.com; Cheng, Hao; Olivier Girardot; user
Subject: RE: 回复:RE:
回复:Re: sparksql running slow while joining_2_tables.
 
You can use
Explain extended select ….
 
From:
luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: Tuesday, May 05, 2015 9:52 AM
To: Cheng, Hao; Olivier Girardot; user
Subject: 回复:RE:
回复:Re: sparksql running slow while joining_2_tables.
 
As I know broadcastjoin is automatically enabled by 
spark.sql.autoBroadcastJoinThreshold.
refer to 
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
 
and how to check my app's physical plan,and others things like optimized 
plan,executable plan.etc
 
thanks
 

 
Thanks&Best regards!
罗辉 San.Luo
 
- 原始邮件 -
发件人:"Cheng, Hao" 
收件人:"Cheng, Hao" , "luohui20...@sina.com" 
,
 Olivier Girardot , user 
主题:RE: 
回复:Re: sparksql running slow while joining_2_tables.
日期:2015年05月05日 08点38分
 
Or, have you ever try broadcast join?
 
From: Cheng, Hao [mailto:hao.ch...@intel.com]
Sent: Tuesday, May 5, 2015 8:33 AM
To: luohui20...@sina.com; Olivier Girardot; user
Subject: RE: 回复:Re: sparksql running slow while joining 2 tables.
 
Can you print out the physical plan?
 
EXPLAIN SELECT xxx…
 
From: luohui20...@

Re: Stop Cluster Mode Running App

2015-05-07 Thread Silvio Fiorito
Hi James,

If you’re on Spark 1.3 you can use the kill command in spark-submit to shut it 
down. You’ll need the driver id from the Spark UI or from when you submitted 
the app.

spark-submit --master spark://master:7077 --kill 

Thanks,
Silvio

From: James King
Date: Wednesday, May 6, 2015 at 12:02 PM
To: user
Subject: Stop Cluster Mode Running App

I submitted a Spark Application in cluster mode and now every time I stop the 
cluster and restart it the job resumes execution.

I even killed a daemon called DriverWrapper it stops the app but it resumes 
again.

How can stop this application from running?


Duplicate entries in output of mllib column similarities

2015-05-07 Thread rbolkey
Hi,

I have a question regarding one of the oddities we encountered while running
mllib's column similarities operation. When we examine the output, we find
duplicate matrix entries (the same i,j). Sometimes the entries have the same
value/similarity score, but they're frequently different too.

Is this a known issue? An artifact of the probabilistic nature of the
output? Which output score should we trust (lower vs higher one when
different)? We're using a threshold of 0.3, and running Spark 1.3.1 on a 10
node cluster.

Thanks
Rick



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-entries-in-output-of-mllib-column-similarities-tp22807.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: question about the TFIDF.

2015-05-07 Thread ai he
Hi Dan,

In
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala,
you can see spark uses Utils.nonNegativeMod(term.##, numFeatures) to locate
a term.

It's also mentioned in the doc that " Maps a sequence of terms to their
term frequencies using the hashing trick."

Thanks


On Wed, May 6, 2015 at 12:44 PM, Dan Dong  wrote:

> Hi, All,
>   When I try to follow the document about tfidf from:
> http://spark.apache.org/docs/latest/mllib-feature-extraction.html
>
>  val conf = new SparkConf().setAppName("TFIDF")
>  val sc=new SparkContext(conf)
>
>  val
> documents=sc.textFile("hdfs://cluster-test-1:9000/user/ubuntu/textExample.txt").map(_.split("
> ").toSeq)
>  val hashingTF = new HashingTF()
>  val tf= hashingTF.transform(documents)
>  tf.cache()
>  val idf = new IDF().fit(tf)
>  val tfidf = idf.transform(tf)
>  val rdd=tfidf.map { vec => vec}
>  rdd.saveAsTextFile("/user/ubuntu/aaa")
>
> I got the following 3 lines output which corresponding to my 3 lines input
> file( each line can be viewed as a separate document):
>
> (1048576,[3211,72752,119839,413342,504006,714241],[1.3862943611198906,0.6931471805599453,0.0,0.6931471805599453,0.6931471805599453,0.6931471805599453])
>
>
> (1048576,[53232,96852,109270,119839],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.0])
>
>
> (1048576,[3139,5740,119839,502586,503762],[0.6931471805599453,0.6931471805599453,0.0,0.6931471805599453,0.6931471805599453])
>
> But how to interpret this? How to match words to the tfidf values? E.g:
> word1->1.3862943611198906
> word2->0.6931471805599453
> ..
>
> In general, how should people interpret/analyze "tfidf" from the
> following? Thanks!
> val tfidf = idf.transform(tf)
>
>   Cheers,
>   Dan
>
>
>


-- 
Best
Ai


Getting data into Spark Streaming

2015-05-07 Thread Sathaye
Hi I am pretty new to spark and I am trying to implement a simple spark
streaming application using Meetup's RSVP stream: stream.meetup.com/2/rsvps 
Any idea how to connect the stream to Spark Streaming?
I am trying out rawSocketStream but not sure what the parameters are(viz.
port)

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-data-into-Spark-Streaming-tp22806.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Blocking DStream.forEachRDD()

2015-05-07 Thread Corey Nolet
It does look the function that's executed is in the driver so doing an
Await.result() on a thread AFTER i've executed an action should work. Just
updating this here in case anyone has this question in the future.
Is this somehtign I can do. I am using a FileOutputFormat inside of the
foreachRDD call. After the input format runs, I want to do some directory
cleanup and I want to block while I'm doing that. Is that something I can
do inside of this function? If not, where would I accomplish this on every
micro-batched itnerval?


Re: Loading file content based on offsets into the memory

2015-05-07 Thread in4maniac
When loading multiple files, spark loads each file as a partition(block). You
can run a function on each partition by using rdd.mapPartitions(function)
function. 

I think you can write a funciton x that extracts everything after the offset
and use this funtion with mapPartitions to extract the relevant lines for
each file. 

Hope this helps





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-file-content-based-on-offsets-into-the-memory-tp22802p22804.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Virtualenv pyspark

2015-05-07 Thread alemagnani
I am currently using pyspark with a virtualenv.
Unfortunately I don't have access to the nodes file system and therefore I
cannot  manually copy the virtual env over there.

I have been using this technique:

I first add a tar ball with the venv
sc.addFile(virtual_env_tarball_file)

Then in the code used on the node to do the computation I activate the venv
like this: 
venv_location = SparkFiles.get(venv_name)
activate_env="%s/bin/activate_this.py" % venv_location
execfile(activate_env, dict(__file__=activate_env))

Is there a better way to do this? 
One of the problem with this approach is that in
spark/python/pyspark/statcounter.py numpy is imported
before the venv is activated and this can cause conflicts with the venv
numpy.

Moreover this requires the venv to be sent around in the cluster all the
time.
Any suggestions?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Virtualenv-pyspark-tp22803.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Map one RDD into two RDD

2015-05-07 Thread anshu shukla
One of  the best discussion in mailing list  :-)  ...Please  help me in
concluding --

The whole discussion concludes that -

1-  Framework  does not support  increasing parallelism of any task just by
any inbuilt function .
2-  User have to manualy write logic for filter output of upstream node in
DAG  to manage input to Downstream nodes (like shuffle grouping etc in
STORM)
3- If we want to increase the level of parallelism of twitter streaming
 Spout  to *get higher rate of  DStream of tweets  (to increase the rate of
input )  , how it is possible ...  *

  *val tweetStream = **TwitterUtils.createStream(ssc, Utils.getAuth)*



On Fri, May 8, 2015 at 2:16 AM, Evo Eftimov  wrote:

> 1. Will rdd2.filter run before rdd1.filter finish?
>
>
>
> YES
>
>
>
> 2. We have to traverse rdd twice. Any comments?
>
>
>
> You can invoke filter or whatever other transformation / function many
> times
>
> Ps: you  have to study / learn the Parallel Programming Model of an OO
> Framework like Spark – in any OO Framework lots of Behavior is hidden /
> encapsulated by the Framework and the client code gets invoked at specific
> points in the Flow of Control / Data based on callback functions
>
>
>
> That’s why stuff like RDD.filter(), RDD.filter() may look “sequential” to
> you but it is not
>
>
>
>
>
> *From:* Bill Q [mailto:bill.q@gmail.com]
> *Sent:* Thursday, May 7, 2015 6:27 PM
>
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> *Subject:* Re: Map one RDD into two RDD
>
>
>
> The multi-threading code in Scala is quite simple and you can google it
> pretty easily. We used the Future framework. You can use Akka also.
>
>
>
> @Evo My concerns for filtering solution are: 1. Will rdd2.filter run
> before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?
>
>
>
> On Thursday, May 7, 2015, Evo Eftimov  wrote:
>
> Scala is a language, Spark is an OO/Functional, Distributed Framework
> facilitating Parallel Programming in a distributed environment
>
>
>
> Any “Scala parallelism” occurs within the Parallel Model imposed by the
> Spark OO Framework – ie it is limited in terms of what it can achieve in
> terms of influencing the Spark Framework behavior – that is the nature of
> programming with/for frameworks
>
>
>
> When RDD1 and RDD2 are partitioned and different Actions applied to them
> this will result in Parallel Pipelines / DAGs within the Spark Framework
>
> RDD1 = RDD.filter()
>
> RDD2 = RDD.filter()
>
>
>
>
>
> *From:* Bill Q [mailto:bill.q@gmail.com]
> *Sent:* Thursday, May 7, 2015 4:55 PM
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> *Subject:* Re: Map one RDD into two RDD
>
>
>
> Thanks for the replies. We decided to use concurrency in Scala to do the
> two mappings using the same source RDD in parallel. So far, it seems to be
> working. Any comments?
>
> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>
> RDD1 = RDD.filter()
>
> RDD2 = RDD.filter()
>
>
>
> *From:* Bill Q [mailto:bill.q@gmail.com ]
> *Sent:* Tuesday, May 5, 2015 10:42 PM
> *To:* user@spark.apache.org
> *Subject:* Map one RDD into two RDD
>
>
>
> Hi all,
>
> I have a large RDD that I map a function to it. Based on the nature of
> each record in the input RDD, I will generate two types of data. I would
> like to save each type into its own RDD. But I can't seem to find an
> efficient way to do it. Any suggestions?
>
>
>
> Many thanks.
>
>
>
>
>
> Bill
>
>
>
> --
>
> Many thanks.
>
> Bill
>
>
>
>
>
> --
>
> Many thanks.
>
> Bill
>
>
>
>
>
> --
>
> Many thanks.
>
> Bill
>
>
>



-- 
Thanks & Regards,
Anshu Shukla


RE: Map one RDD into two RDD

2015-05-07 Thread Evo Eftimov
1. Will rdd2.filter run before rdd1.filter finish? 

 

YES

 

2. We have to traverse rdd twice. Any comments?

 

You can invoke filter or whatever other transformation / function many times 



Ps: you  have to study / learn the Parallel Programming Model of an OO 
Framework like Spark – in any OO Framework lots of Behavior is hidden / 
encapsulated by the Framework and the client code gets invoked at specific 
points in the Flow of Control / Data based on callback functions 

 

That’s why stuff like RDD.filter(), RDD.filter() may look “sequential” to you 
but it is not  

 

 

From: Bill Q [mailto:bill.q@gmail.com] 
Sent: Thursday, May 7, 2015 6:27 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Map one RDD into two RDD

 

The multi-threading code in Scala is quite simple and you can google it pretty 
easily. We used the Future framework. You can use Akka also.

 

@Evo My concerns for filtering solution are: 1. Will rdd2.filter run before 
rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?



On Thursday, May 7, 2015, Evo Eftimov  wrote:

Scala is a language, Spark is an OO/Functional, Distributed Framework 
facilitating Parallel Programming in a distributed environment 

 

Any “Scala parallelism” occurs within the Parallel Model imposed by the Spark 
OO Framework – ie it is limited in terms of what it can achieve in terms of 
influencing the Spark Framework behavior – that is the nature of programming 
with/for frameworks 

 

When RDD1 and RDD2 are partitioned and different Actions applied to them this 
will result in Parallel Pipelines / DAGs within the Spark Framework

RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

 

From: Bill Q [mailto:bill.q@gmail.com 
 ] 
Sent: Thursday, May 7, 2015 4:55 PM
To: Evo Eftimov
Cc: user@spark.apache.org 
 
Subject: Re: Map one RDD into two RDD

 

Thanks for the replies. We decided to use concurrency in Scala to do the two 
mappings using the same source RDD in parallel. So far, it seems to be working. 
Any comments?

On Wednesday, May 6, 2015, Evo Eftimov  > wrote:

RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

From: Bill Q [mailto:bill.q@gmail.com] 
Sent: Tuesday, May 5, 2015 10:42 PM
To: user@spark.apache.org
Subject: Map one RDD into two RDD

 

Hi all,

I have a large RDD that I map a function to it. Based on the nature of each 
record in the input RDD, I will generate two types of data. I would like to 
save each type into its own RDD. But I can't seem to find an efficient way to 
do it. Any suggestions?

 

Many thanks.

 

 

Bill



-- 

Many thanks.

Bill

 



-- 

Many thanks.

Bill

 



-- 

Many thanks.



Bill

 



Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread in4maniac
Hi V, 

I am assuming that each of the three .parquet paths you mentioned have
multiple partitions in them. 

For eg: [/dataset/city=London/data.parquet/part-r-0.parquet,
/dataset/city=London/data.parquet/part-r-1.parquet]

I haven't personally used this with "hdfs", but I've worked with a similar
file strucutre with '=' in "S3". 

And how i get around this is by building a string of all the filepaths
seperated by commas (with NO spaces inbetween). Then I use that string as
the filepath parameter. I think the following adaptation of S3 file access
pattern to HDFS would work

If I want to load 1 file:
sqlcontext.parquetFile( "hdfs://some
ip:8029/dataset/city=London/data.parquet")

If I want to load multiple files (lets say all 3 of them):
sqlcontext.parquetFile( "hdfs://some
ip:8029/dataset/city=London/data.parquet,hdfs://some
ip:8029/dataset/city=NewYork/data.parquet,hdfs://some
ip:8029/dataset/city=Paris/data.parquet")

*** But in the multiple file scenario, the schema of all the files should be
the same

I hope you can use this S3 pattern with HDFS and hope it works !!

Thanks
in4



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792p22801.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



AWS-Credentials fails with org.apache.hadoop.fs.s3.S3Exception: FORBIDDEN

2015-05-07 Thread in4maniac
Hi Guys, 

I think this problem is related to : 
http://apache-spark-user-list.1001560.n3.nabble.com/AWS-Credentials-for-private-S3-reads-td8689.html

I am running pyspark 1.2.1 in AWS with my AWS credentials exported to master
node as Environmental Variables.

Halfway through my application, I get thrown with a
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:
S3 HEAD request failed for "file path" - ResponseCode=403,
ResponseMessage=Forbidden

Here is some important information about my job: 
+ my AWS credentials exported to master node as Environmental Variables
+ there are no '/'s in my secret key
+ The earlier steps that uses this parquet file actually complete
successsfully
+ The step before the count() does the following:
   + reads the parquet file (SELECT STATEMENT)
   + maps it to an RDD
   + runs a filter on the RDD
+ The filter works as follows:
   + extracts one field from each RDD line
   + checks with a list of 40,000 hashes for presence (if field in
LIST_OF_HASHES.value)
   + LIST_OF_HASHES is a broadcast object

The wierdness is that I am using this parquet file in earlier steps and it
works fine. The other confusion I have is due to the fact that it only
starts failing halfway through the stage. It completes a fraction of tasks
and then starts failing..  

Hoping to hear something positive. Many thanks in advance

Sahanbull

The stack trace is as follows:
>>> negativeObs.count()
[Stage 9:==>   (161 + 240) /
800]

15/05/07 07:55:59 ERROR TaskSetManager: Task 277 in stage 9.0 failed 4
times; aborting job
Traceback (most recent call last):
  File "", line 1, in 
  File "/root/spark/python/pyspark/rdd.py", line 829, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/root/spark/python/pyspark/rdd.py", line 820, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File "/root/spark/python/pyspark/rdd.py", line 725, in reduce
vals = self.mapPartitions(func).collect()
  File "/root/spark/python/pyspark/rdd.py", line 686, in collect
bytesInJava = self._jrdd.collect().iterator()
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o139.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
277 in stage 9.0 failed 4 times, most recent failure: Lost task 277.3 in
stage 9.0 (TID 4832, ip-172-31-1-185.ec2.internal):
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:
S3 HEAD request failed for
'/subbucket%2Fpath%2F2Fpath%2F2Fpath%2F2Fpath%2F2Fpath%2Ffilename.parquet%2Fpart-r-349.parquet'
- ResponseCode=403, ResponseMessage=Forbidden
at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:122)
at sun.reflect.GeneratedMethodAccessor116.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at org.apache.hadoop.fs.s3native.$Proxy9.retrieveMetadata(Unknown
Source)
at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:326)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381)
at
parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155)
at
parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:135)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.api.python.

Re: Spark Job triggers second attempt

2015-05-07 Thread Doug Balog
I bet you are running on YARN in cluster mode.

If you are running on yarn in client mode, 
.set(“spark.yarn.maxAppAttempts”,”1”) works as you expect,
because YARN doesn’t start your app on the cluster until you call 
SparkContext().

But If you are running on yarn in cluster mode, the driver program runs from a 
cluster node.
So your app is already running on the cluster when you call .set().
To make it work in cluster mode,  the property must be set on the spark-submit 
command line via 
"—conf spark.yarn.maxAppAttempts=1”
or —driver-options “-Dspark.yarn.maxAppAttempts=1”


A note should be added to running-on-yarn.html in the "Important notes” section 
that
says that in cluster mode you need to set  spark.yarn.* properties from 
spark-submit command line.

Cheers,

Doug




> On May 7, 2015, at 2:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:
> 
> How i can stop Spark to stop triggering second attempt in case the first 
> fails.
> I do not want to wait for the second attempt to fail again so that i can 
> debug faster.
> 
> .set("spark.yarn.maxAppAttempts", "0") OR .set("spark.yarn.maxAppAttempts", 
> "1")
> 
> is not helping.
> 
> 
> -- 
> Deepak
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Predict.scala using model for clustering In reference

2015-05-07 Thread anshu shukla
Can anyone please explain -

println("Initalizaing the the KMeans model...")
val model = new
KMeansModel(ssc.sparkContext.objectFile[Vector](modelFile.toString).collect())

where modelfile is  *directory to persist the model while training *


  REF-

https://github.com/databricks/reference-apps/blob/master/twitter_classifier/predict.md


-- 
Thanks & Regards,
Anshu Shukla


Blocking DStream.forEachRDD()

2015-05-07 Thread Corey Nolet
Is this somehtign I can do. I am using a FileOutputFormat inside of the
foreachRDD call. After the input format runs, I want to do some directory
cleanup and I want to block while I'm doing that. Is that something I can
do inside of this function? If not, where would I accomplish this on every
micro-batched itnerval?


Re: AvroFiles

2015-05-07 Thread Michael Armbrust
I would suggest also looking at: https://github.com/databricks/spark-avro

On Wed, May 6, 2015 at 10:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> Hello,
> This is how i read Avro data.
>
> import org.apache.avro.generic.GenericData
> import org.apache.avro.generic.GenericRecord
> import org.apache.avro.mapred.AvroKey
> import org.apache.avro.Schema
> import org.apache.hadoop.io.NullWritable
> import org.apache.avro.mapreduce.AvroKeyInputFormat
>
> -- Read
> def readGenericRecords(sc: SparkContext, inputDir: String, startDate:
> Date, endDate: Date) = {
> val path = getInputPaths(inputDir, startDate, endDate)
> sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
> AvroKeyInputFormat[GenericRecord]]("file.avro")
> }
>
> -- Write
> protected def writeOutputRecords(detailRecords:
> RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) {
> val writeJob = new Job()
>  // Schema of data you need to write.
> val schema = SchemaUtil.viewItemOutputSchema
> AvroJob.setOutputKeySchema(writeJob, schema)
> detailRecords.saveAsNewAPIHadoopFile(outputDir,
>   classOf[AvroKey[GenericRecord]],
>   classOf[org.apache.hadoop.io.NullWritable],
>   classOf[AvroKeyOutputFormat[GenericRecord]],
>   writeJob.getConfiguration)
> }
>
>
> 
> org.apache.avro
> avro
> 1.7.7
> provided
> 
>
> It works. I do not see NPE.
>
> On Wed, May 6, 2015 at 7:50 AM, Pankaj Deshpande  wrote:
>
>> I am not using kyro. I was using the regular sqlcontext.avrofiles to
>> open. The files loads properly with the schema. Exception happens when I
>> try to read it. Will try  kyro serializer and see if that helps.
>> On May 5, 2015 9:02 PM, "Todd Nist"  wrote:
>>
>>> Are you using Kryo or Java serialization? I found this post useful:
>>>
>>>
>>> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
>>>
>>> If using kryo, you need to register the classes with kryo, something
>>> like this:
>>>
>>>
>>> sc.registerKryoClasses(Array(
>>> classOf[ConfigurationProperty],
>>>classOf[Event]
>>> ))
>>>
>>> Or create a registrator something like this:
>>>
>>> class ODSKryoRegistrator extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>> kryo.register(classOf[ConfigurationProperty], new 
>>> AvroSerializer[ConfigurationProperty]())
>>> kryo.register(classOf[Event], new AvroSerializer[Event]()))
>>>   }
>>>
>>> I encountered a similar error since several of the Avor core classes are
>>> not marked Serializable.
>>>
>>> HTH.
>>>
>>> Todd
>>>
>>> On Tue, May 5, 2015 at 7:09 PM, Pankaj Deshpande 
>>> wrote:
>>>
 Hi I am using Spark 1.3.1 to read an avro file stored on HDFS. The avro
 file was created using Avro 1.7.7. Similar to the example mentioned in
 http://www.infoobjects.com/spark-with-avro/
 I am getting a nullPointerException on Schema read. It could be a avro
 version mismatch. Has anybody had a similar issue with avro.


 Thanks

>>>
>>>
>
>
> --
> Deepak
>
>


Re: saveAsTable fails on Python with "Unresolved plan found"

2015-05-07 Thread Michael Armbrust
Sorry for the confusion.  SQLContext doesn't have a persistent metastore so
its not possible to save data as a table.  If anyone wants to contribute,
I'd welcome a new query planner strategy for SQLContext that gave a better
error message.

On Thu, May 7, 2015 at 8:41 AM, Judy Nash 
wrote:

>  Figured it out. It was because I was using HiveContext instead of
> SQLContext.
>
> FYI in case others saw the same issue.
>
>
>
> *From:* Judy Nash
> *Sent:* Thursday, May 7, 2015 7:38 AM
> *To:* 'user@spark.apache.org'
> *Subject:* RE: saveAsTable fails on Python with "Unresolved plan found"
>
>
>
> SPARK-4825  looks like
> the right bug, but it should’ve been fixed on 1.2.1.
>
>
>
> Is a similar fix needed in Python?
>
>
>
> *From:* Judy Nash
> *Sent:* Thursday, May 7, 2015 7:26 AM
> *To:* user@spark.apache.org
> *Subject:* saveAsTable fails on Python with "Unresolved plan found"
>
>
>
> Hello,
>
>
>
> I am following the tutorial code on sql programming guide
> 
> to try out Python on spark 1.2.1.
>
>
>
> SaveAsTable function works on Scala bur fails on python with “Unresolved
> plan found”.
>
>
>
> *Broken Python code:*
>
> from pyspark.sql import SQLContext, Row
>
> sqlContext = SQLContext(sc)
>
> lines = sc.textFile("data.txt")
>
> parts = lines.map(lambda l: l.split(","))
>
> people = parts.map(lambda p: Row(id=p[0], name=p[1]))
>
> schemaPeople = sqlContext.inferSchema(people)
>
> schemaPeople.saveAsTable("peopletable")
>
>
>
> saveAsTable fails with Unresolved plan found.
>
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
> plan found, tree:
>
> 'CreateTableAsSelect None, pytable, false, None
>
>
>
>
>
> *This scala code works fine:*
>
> from pyspark.sql import SQLContext, Row
>
> sqlContext = SQLContext(sc)
>
> lines = sc.textFile("data.txt")
>
> parts = lines.map(lambda l: l.split(","))
>
> people = parts.map(lambda p: Row(id=p[0], name=p[1]))
>
> schemaPeople = sqlContext.inferSchema(people)
>
> schemaPeople.saveAsTable("peopletable")
>
>
>
>
>
> Is this a known issue? Or am I not using Python correctly?
>
>
>
> Thanks,
>
> Judy
>


Re: Avro to Parquet ?

2015-05-07 Thread Michael Armbrust
Spark SQL using the Data Source API can also do this with much less code
.

https://github.com/databricks/spark-avro

On Thu, May 7, 2015 at 8:41 AM, Jonathan Coveney  wrote:

> A helpful example of how to convert:
> http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/
>
> As far as performance, that depends on your data. If you have a lot of
> columns and use all of them, parquet deserialization is expensive. If you
> have a column and only need a few (or have some filters you can push down),
> the savings can be huge.
>
> 2015-05-07 11:29 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :
>
> 1) What is the best way to convert data from Avro to Parquet so that it
>> can be later read and processed ?
>>
>> 2) Will the performance of processing (join, reduceByKey) be better if
>> both datasets are in Parquet format when compared to Avro + Sequence ?
>>
>> --
>> Deepak
>>
>>
>


Re: history server

2015-05-07 Thread Ankur Chauhan
Hi,

Sorry this may be a little off topic but I tried searching for docs on history 
server but couldn't really find much. Can someone point me to a doc or give me 
a point of reference for the use and intent of a history server?


-- Ankur

> On 7 May 2015, at 12:06, Koert Kuipers  wrote:
> 
> got it. thanks!
> 
> On Thu, May 7, 2015 at 2:52 PM, Marcelo Vanzin  wrote:
> Ah, sorry, that's definitely what Shixiong mentioned. The patch I mentioned 
> did not make it into 1.3...
> 
> On Thu, May 7, 2015 at 11:48 AM, Koert Kuipers  wrote:
> seems i got one thread spinning 100% for a while now, in 
> FsHistoryProvider.initialize(). maybe something wrong with my logs on hdfs 
> that its reading? or could it simply really take 30 mins to read all the 
> history on dhfs?
> 
> jstack:
> 
> Deadlock Detection:
> 
> No deadlocks found.
> 
> Thread 2272: (state = BLOCKED)
>  - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information 
> may be imprecise)
>  - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object, long) 
> @bci=20, line=226 (Compiled frame)
>  - 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.util.concurrent.SynchronousQueue$TransferStack$SNode,
>  boolean, long) @bci=174, line=460 (Compiled frame)
>  - 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.lang.Object,
>  boolean, long) @bci=102, line=359 (Interpreted frame)
>  - java.util.concurrent.SynchronousQueue.poll(long, 
> java.util.concurrent.TimeUnit) @bci=11, line=942 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor.getTask() @bci=141, line=1068 
> (Interpreted frame)
>  - 
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
>  @bci=26, line=1130 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 
> (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> 
> 
> Thread 1986: (state = BLOCKED)
>  - java.lang.Thread.sleep(long) @bci=0 (Interpreted frame)
>  - org.apache.hadoop.hdfs.PeerCache.run() @bci=41, line=250 (Interpreted 
> frame)
>  - 
> org.apache.hadoop.hdfs.PeerCache.access$000(org.apache.hadoop.hdfs.PeerCache) 
> @bci=1, line=41 (Interpreted frame)
>  - org.apache.hadoop.hdfs.PeerCache$1.run() @bci=4, line=119 (Interpreted 
> frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
> 
> 
> Thread 1970: (state = BLOCKED)
> 
> 
> Thread 1969: (state = BLOCKED)
>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>  - java.lang.ref.ReferenceQueue.remove(long) @bci=44, line=135 (Interpreted 
> frame)
>  - java.lang.ref.ReferenceQueue.remove() @bci=2, line=151 (Interpreted frame)
>  - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=209 
> (Interpreted frame)
> 
> 
> Thread 1968: (state = BLOCKED)
>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
>  - java.lang.ref.Reference$ReferenceHandler.run() @bci=46, line=133 
> (Interpreted frame)
> 
> 
> Thread 1958: (state = IN_VM)
>  - java.lang.Throwable.fillInStackTrace(int) @bci=0 (Compiled frame; 
> information may be imprecise)
>  - java.lang.Throwable.fillInStackTrace() @bci=16, line=783 (Compiled frame)
>  - java.lang.Throwable.(java.lang.String, java.lang.Throwable) @bci=24, 
> line=287 (Compiled frame)
>  - java.lang.Exception.(java.lang.String, java.lang.Throwable) @bci=3, 
> line=84 (Compiled frame)
>  - org.json4s.package$MappingException.(java.lang.String, 
> java.lang.Exception) @bci=13, line=56 (Compiled frame)
>  - org.json4s.reflect.package$.fail(java.lang.String, java.lang.Exception) 
> @bci=6, line=96 (Compiled frame)
>  - org.json4s.Extraction$.convert(org.json4s.JsonAST$JValue, 
> org.json4s.reflect.ScalaType, org.json4s.Formats, scala.Option) @bci=2447, 
> line=554 (Compiled frame)
>  - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue, 
> org.json4s.reflect.ScalaType, org.json4s.Formats) @bci=796, line=331 
> (Compiled frame)
>  - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue, 
> org.json4s.Formats, scala.reflect.Manifest) @bci=10, line=42 (Compiled frame)
>  - org.json4s.Extraction$.extractOpt(org.json4s.JsonAST$JValue, 
> org.json4s.Formats, scala.reflect.Manifest) @bci=7, line=54 (Compiled frame)
>  - org.json4s.ExtractableJsonAstNode.extractOpt(org.json4s.Formats, 
> scala.reflect.Manifest) @bci=9, line=40 (Compiled frame)
>  - 
> org.apache.spark.util.JsonProtocol$.shuffleWriteMetricsFromJson(org.json4s.JsonAST$JValue)
>  @bci=116, line=702 (Compiled frame)
>  - 
> org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(org.json4s.JsonAST$JValue)
>  @bci=4, line=670 (Compiled frame)
>  - 
> org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(java.lang.Object)
>  @bci=5, line=670 (Compiled frame)
>  - scala.Option.map(scala.Function1) @bci=22, line=145 (Compiled frame)
>  - 
> org.apache.spark.util.JsonPro

Re: Map one RDD into two RDD

2015-05-07 Thread Gerard Maas
Hi Bill,

I just found weird that one would use parallel threads to 'filter', as
filter is lazy in Spark, and multithreading wouldn't have any effect unless
the action triggering the execution of the lineage containing such filter
is executed on a separate thread. One must have very specific
reasons/requirements to do that, beyond 'not traversing the data twice'.
The request for the code was only to help checking that.

-kr, Gerard.

On Thu, May 7, 2015 at 7:26 PM, Bill Q  wrote:

> The multi-threading code in Scala is quite simple and you can google it
> pretty easily. We used the Future framework. You can use Akka also.
>
> @Evo My concerns for filtering solution are: 1. Will rdd2.filter run
> before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?
>
>
>
> On Thursday, May 7, 2015, Evo Eftimov  wrote:
>
>> Scala is a language, Spark is an OO/Functional, Distributed Framework
>> facilitating Parallel Programming in a distributed environment
>>
>>
>>
>> Any “Scala parallelism” occurs within the Parallel Model imposed by the
>> Spark OO Framework – ie it is limited in terms of what it can achieve in
>> terms of influencing the Spark Framework behavior – that is the nature of
>> programming with/for frameworks
>>
>>
>>
>> When RDD1 and RDD2 are partitioned and different Actions applied to them
>> this will result in Parallel Pipelines / DAGs within the Spark Framework
>>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Thursday, May 7, 2015 4:55 PM
>> *To:* Evo Eftimov
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Map one RDD into two RDD
>>
>>
>>
>> Thanks for the replies. We decided to use concurrency in Scala to do the
>> two mappings using the same source RDD in parallel. So far, it seems to be
>> working. Any comments?
>>
>> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Tuesday, May 5, 2015 10:42 PM
>> *To:* user@spark.apache.org
>> *Subject:* Map one RDD into two RDD
>>
>>
>>
>> Hi all,
>>
>> I have a large RDD that I map a function to it. Based on the nature of
>> each record in the input RDD, I will generate two types of data. I would
>> like to save each type into its own RDD. But I can't seem to find an
>> efficient way to do it. Any suggestions?
>>
>>
>>
>> Many thanks.
>>
>>
>>
>>
>>
>> Bill
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>
>
> --
> Many thanks.
>
>
> Bill
>
>


Re: history server

2015-05-07 Thread Koert Kuipers
got it. thanks!

On Thu, May 7, 2015 at 2:52 PM, Marcelo Vanzin  wrote:

> Ah, sorry, that's definitely what Shixiong mentioned. The patch I
> mentioned did not make it into 1.3...
>
> On Thu, May 7, 2015 at 11:48 AM, Koert Kuipers  wrote:
>
>> seems i got one thread spinning 100% for a while now, in
>> FsHistoryProvider.initialize(). maybe something wrong with my logs on hdfs
>> that its reading? or could it simply really take 30 mins to read all the
>> history on dhfs?
>>
>> jstack:
>>
>> Deadlock Detection:
>>
>> No deadlocks found.
>>
>> Thread 2272: (state = BLOCKED)
>>  - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame;
>> information may be imprecise)
>>  - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object,
>> long) @bci=20, line=226 (Compiled frame)
>>  -
>> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.util.concurrent.SynchronousQueue$TransferStack$SNode,
>> boolean, long) @bci=174, line=460 (Compiled frame)
>>  -
>> java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.lang.Object,
>> boolean, long) @bci=102, line=359 (Interpreted frame)
>>  - java.util.concurrent.SynchronousQueue.poll(long,
>> java.util.concurrent.TimeUnit) @bci=11, line=942 (Interpreted frame)
>>  - java.util.concurrent.ThreadPoolExecutor.getTask() @bci=141, line=1068
>> (Interpreted frame)
>>  -
>> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
>> @bci=26, line=1130 (Interpreted frame)
>>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
>> (Interpreted frame)
>>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
>>
>>
>> Thread 1986: (state = BLOCKED)
>>  - java.lang.Thread.sleep(long) @bci=0 (Interpreted frame)
>>  - org.apache.hadoop.hdfs.PeerCache.run() @bci=41, line=250 (Interpreted
>> frame)
>>  -
>> org.apache.hadoop.hdfs.PeerCache.access$000(org.apache.hadoop.hdfs.PeerCache)
>> @bci=1, line=41 (Interpreted frame)
>>  - org.apache.hadoop.hdfs.PeerCache$1.run() @bci=4, line=119 (Interpreted
>> frame)
>>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
>>
>>
>> Thread 1970: (state = BLOCKED)
>>
>>
>> Thread 1969: (state = BLOCKED)
>>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>>  - java.lang.ref.ReferenceQueue.remove(long) @bci=44, line=135
>> (Interpreted frame)
>>  - java.lang.ref.ReferenceQueue.remove() @bci=2, line=151 (Interpreted
>> frame)
>>  - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=209
>> (Interpreted frame)
>>
>>
>> Thread 1968: (state = BLOCKED)
>>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>>  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
>>  - java.lang.ref.Reference$ReferenceHandler.run() @bci=46, line=133
>> (Interpreted frame)
>>
>>
>> Thread 1958: (state = IN_VM)
>>  - java.lang.Throwable.fillInStackTrace(int) @bci=0 (Compiled frame;
>> information may be imprecise)
>>  - java.lang.Throwable.fillInStackTrace() @bci=16, line=783 (Compiled
>> frame)
>>  - java.lang.Throwable.(java.lang.String, java.lang.Throwable)
>> @bci=24, line=287 (Compiled frame)
>>  - java.lang.Exception.(java.lang.String, java.lang.Throwable)
>> @bci=3, line=84 (Compiled frame)
>>  - org.json4s.package$MappingException.(java.lang.String,
>> java.lang.Exception) @bci=13, line=56 (Compiled frame)
>>  - org.json4s.reflect.package$.fail(java.lang.String,
>> java.lang.Exception) @bci=6, line=96 (Compiled frame)
>>  - org.json4s.Extraction$.convert(org.json4s.JsonAST$JValue,
>> org.json4s.reflect.ScalaType, org.json4s.Formats, scala.Option) @bci=2447,
>> line=554 (Compiled frame)
>>  - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue,
>> org.json4s.reflect.ScalaType, org.json4s.Formats) @bci=796, line=331
>> (Compiled frame)
>>  - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue,
>> org.json4s.Formats, scala.reflect.Manifest) @bci=10, line=42 (Compiled
>> frame)
>>  - org.json4s.Extraction$.extractOpt(org.json4s.JsonAST$JValue,
>> org.json4s.Formats, scala.reflect.Manifest) @bci=7, line=54 (Compiled frame)
>>  - org.json4s.ExtractableJsonAstNode.extractOpt(org.json4s.Formats,
>> scala.reflect.Manifest) @bci=9, line=40 (Compiled frame)
>>  -
>> org.apache.spark.util.JsonProtocol$.shuffleWriteMetricsFromJson(org.json4s.JsonAST$JValue)
>> @bci=116, line=702 (Compiled frame)
>>  -
>> org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(org.json4s.JsonAST$JValue)
>> @bci=4, line=670 (Compiled frame)
>>  -
>> org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(java.lang.Object)
>> @bci=5, line=670 (Compiled frame)
>>  - scala.Option.map(scala.Function1) @bci=22, line=145 (Compiled frame)
>>  -
>> org.apache.spark.util.JsonProtocol$.taskMetricsFromJson(org.json4s.JsonAST$JValue)
>> @bci=414, line=670 (Compiled frame)
>>  -
>> org.apache.spark.util.JsonProtocol$.taskEndFromJson(org.json4s.JsonAST$JValue)
>> @bci=174, line=508 (Compiled frame)
>>  -
>> org.apache.spa

Re: Spark unit test fails

2015-05-07 Thread NoWisdom
I'm also getting the same error.

Any ideas?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-unit-test-fails-tp22368p22798.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: history server

2015-05-07 Thread Shixiong Zhu
SPARK-5522 is really cool. Didn't notice it.

Best Regards,
Shixiong Zhu

2015-05-07 11:36 GMT-07:00 Marcelo Vanzin :

> That shouldn't be true in 1.3 (see SPARK-5522).
>
> On Thu, May 7, 2015 at 11:33 AM, Shixiong Zhu  wrote:
>
>> The history server may need several hours to start if you have a lot of
>> event logs. Is it stuck, or still replaying logs?
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-05-07 11:03 GMT-07:00 Marcelo Vanzin :
>>
>> Can you get a jstack for the process? Maybe it's stuck somewhere.
>>>
>>> On Thu, May 7, 2015 at 11:00 AM, Koert Kuipers 
>>> wrote:
>>>
 i am trying to launch the spark 1.3.1 history server on a secure
 cluster.

 i can see in the logs that it successfully logs into kerberos, and it
 is replaying all the logs, but i never see the log message that indicate
 the web server is started (i should see something like "Successfully
 started service on port 18080." or "Started HistoryServer at
 http://somehost:18080";). yet the daemon stays alive...

 any idea why the history server would never start the web service?

 thanks!


>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>
>
> --
> Marcelo
>


Re: history server

2015-05-07 Thread Marcelo Vanzin
Ah, sorry, that's definitely what Shixiong mentioned. The patch I mentioned
did not make it into 1.3...

On Thu, May 7, 2015 at 11:48 AM, Koert Kuipers  wrote:

> seems i got one thread spinning 100% for a while now, in
> FsHistoryProvider.initialize(). maybe something wrong with my logs on hdfs
> that its reading? or could it simply really take 30 mins to read all the
> history on dhfs?
>
> jstack:
>
> Deadlock Detection:
>
> No deadlocks found.
>
> Thread 2272: (state = BLOCKED)
>  - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information
> may be imprecise)
>  - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object,
> long) @bci=20, line=226 (Compiled frame)
>  -
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.util.concurrent.SynchronousQueue$TransferStack$SNode,
> boolean, long) @bci=174, line=460 (Compiled frame)
>  -
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.lang.Object,
> boolean, long) @bci=102, line=359 (Interpreted frame)
>  - java.util.concurrent.SynchronousQueue.poll(long,
> java.util.concurrent.TimeUnit) @bci=11, line=942 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor.getTask() @bci=141, line=1068
> (Interpreted frame)
>  -
> java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
> @bci=26, line=1130 (Interpreted frame)
>  - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
> (Interpreted frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
>
>
> Thread 1986: (state = BLOCKED)
>  - java.lang.Thread.sleep(long) @bci=0 (Interpreted frame)
>  - org.apache.hadoop.hdfs.PeerCache.run() @bci=41, line=250 (Interpreted
> frame)
>  -
> org.apache.hadoop.hdfs.PeerCache.access$000(org.apache.hadoop.hdfs.PeerCache)
> @bci=1, line=41 (Interpreted frame)
>  - org.apache.hadoop.hdfs.PeerCache$1.run() @bci=4, line=119 (Interpreted
> frame)
>  - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)
>
>
> Thread 1970: (state = BLOCKED)
>
>
> Thread 1969: (state = BLOCKED)
>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>  - java.lang.ref.ReferenceQueue.remove(long) @bci=44, line=135
> (Interpreted frame)
>  - java.lang.ref.ReferenceQueue.remove() @bci=2, line=151 (Interpreted
> frame)
>  - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=209
> (Interpreted frame)
>
>
> Thread 1968: (state = BLOCKED)
>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
>  - java.lang.ref.Reference$ReferenceHandler.run() @bci=46, line=133
> (Interpreted frame)
>
>
> Thread 1958: (state = IN_VM)
>  - java.lang.Throwable.fillInStackTrace(int) @bci=0 (Compiled frame;
> information may be imprecise)
>  - java.lang.Throwable.fillInStackTrace() @bci=16, line=783 (Compiled
> frame)
>  - java.lang.Throwable.(java.lang.String, java.lang.Throwable)
> @bci=24, line=287 (Compiled frame)
>  - java.lang.Exception.(java.lang.String, java.lang.Throwable)
> @bci=3, line=84 (Compiled frame)
>  - org.json4s.package$MappingException.(java.lang.String,
> java.lang.Exception) @bci=13, line=56 (Compiled frame)
>  - org.json4s.reflect.package$.fail(java.lang.String, java.lang.Exception)
> @bci=6, line=96 (Compiled frame)
>  - org.json4s.Extraction$.convert(org.json4s.JsonAST$JValue,
> org.json4s.reflect.ScalaType, org.json4s.Formats, scala.Option) @bci=2447,
> line=554 (Compiled frame)
>  - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue,
> org.json4s.reflect.ScalaType, org.json4s.Formats) @bci=796, line=331
> (Compiled frame)
>  - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue,
> org.json4s.Formats, scala.reflect.Manifest) @bci=10, line=42 (Compiled
> frame)
>  - org.json4s.Extraction$.extractOpt(org.json4s.JsonAST$JValue,
> org.json4s.Formats, scala.reflect.Manifest) @bci=7, line=54 (Compiled frame)
>  - org.json4s.ExtractableJsonAstNode.extractOpt(org.json4s.Formats,
> scala.reflect.Manifest) @bci=9, line=40 (Compiled frame)
>  -
> org.apache.spark.util.JsonProtocol$.shuffleWriteMetricsFromJson(org.json4s.JsonAST$JValue)
> @bci=116, line=702 (Compiled frame)
>  -
> org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(org.json4s.JsonAST$JValue)
> @bci=4, line=670 (Compiled frame)
>  -
> org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(java.lang.Object)
> @bci=5, line=670 (Compiled frame)
>  - scala.Option.map(scala.Function1) @bci=22, line=145 (Compiled frame)
>  -
> org.apache.spark.util.JsonProtocol$.taskMetricsFromJson(org.json4s.JsonAST$JValue)
> @bci=414, line=670 (Compiled frame)
>  -
> org.apache.spark.util.JsonProtocol$.taskEndFromJson(org.json4s.JsonAST$JValue)
> @bci=174, line=508 (Compiled frame)
>  -
> org.apache.spark.util.JsonProtocol$.sparkEventFromJson(org.json4s.JsonAST$JValue)
> @bci=389, line=464 (Compiled frame)
>  -
> org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$1.apply(java.lang

Re: history server

2015-05-07 Thread Koert Kuipers
seems i got one thread spinning 100% for a while now, in
FsHistoryProvider.initialize(). maybe something wrong with my logs on hdfs
that its reading? or could it simply really take 30 mins to read all the
history on dhfs?

jstack:

Deadlock Detection:

No deadlocks found.

Thread 2272: (state = BLOCKED)
 - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information
may be imprecise)
 - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object, long)
@bci=20, line=226 (Compiled frame)
 -
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.util.concurrent.SynchronousQueue$TransferStack$SNode,
boolean, long) @bci=174, line=460 (Compiled frame)
 -
java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.lang.Object,
boolean, long) @bci=102, line=359 (Interpreted frame)
 - java.util.concurrent.SynchronousQueue.poll(long,
java.util.concurrent.TimeUnit) @bci=11, line=942 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor.getTask() @bci=141, line=1068
(Interpreted frame)
 -
java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
@bci=26, line=1130 (Interpreted frame)
 - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615
(Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)


Thread 1986: (state = BLOCKED)
 - java.lang.Thread.sleep(long) @bci=0 (Interpreted frame)
 - org.apache.hadoop.hdfs.PeerCache.run() @bci=41, line=250 (Interpreted
frame)
 -
org.apache.hadoop.hdfs.PeerCache.access$000(org.apache.hadoop.hdfs.PeerCache)
@bci=1, line=41 (Interpreted frame)
 - org.apache.hadoop.hdfs.PeerCache$1.run() @bci=4, line=119 (Interpreted
frame)
 - java.lang.Thread.run() @bci=11, line=745 (Interpreted frame)


Thread 1970: (state = BLOCKED)


Thread 1969: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove(long) @bci=44, line=135 (Interpreted
frame)
 - java.lang.ref.ReferenceQueue.remove() @bci=2, line=151 (Interpreted
frame)
 - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=209
(Interpreted frame)


Thread 1968: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.ref.Reference$ReferenceHandler.run() @bci=46, line=133
(Interpreted frame)


Thread 1958: (state = IN_VM)
 - java.lang.Throwable.fillInStackTrace(int) @bci=0 (Compiled frame;
information may be imprecise)
 - java.lang.Throwable.fillInStackTrace() @bci=16, line=783 (Compiled frame)
 - java.lang.Throwable.(java.lang.String, java.lang.Throwable)
@bci=24, line=287 (Compiled frame)
 - java.lang.Exception.(java.lang.String, java.lang.Throwable)
@bci=3, line=84 (Compiled frame)
 - org.json4s.package$MappingException.(java.lang.String,
java.lang.Exception) @bci=13, line=56 (Compiled frame)
 - org.json4s.reflect.package$.fail(java.lang.String, java.lang.Exception)
@bci=6, line=96 (Compiled frame)
 - org.json4s.Extraction$.convert(org.json4s.JsonAST$JValue,
org.json4s.reflect.ScalaType, org.json4s.Formats, scala.Option) @bci=2447,
line=554 (Compiled frame)
 - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue,
org.json4s.reflect.ScalaType, org.json4s.Formats) @bci=796, line=331
(Compiled frame)
 - org.json4s.Extraction$.extract(org.json4s.JsonAST$JValue,
org.json4s.Formats, scala.reflect.Manifest) @bci=10, line=42 (Compiled
frame)
 - org.json4s.Extraction$.extractOpt(org.json4s.JsonAST$JValue,
org.json4s.Formats, scala.reflect.Manifest) @bci=7, line=54 (Compiled frame)
 - org.json4s.ExtractableJsonAstNode.extractOpt(org.json4s.Formats,
scala.reflect.Manifest) @bci=9, line=40 (Compiled frame)
 -
org.apache.spark.util.JsonProtocol$.shuffleWriteMetricsFromJson(org.json4s.JsonAST$JValue)
@bci=116, line=702 (Compiled frame)
 -
org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(org.json4s.JsonAST$JValue)
@bci=4, line=670 (Compiled frame)
 -
org.apache.spark.util.JsonProtocol$$anonfun$taskMetricsFromJson$2.apply(java.lang.Object)
@bci=5, line=670 (Compiled frame)
 - scala.Option.map(scala.Function1) @bci=22, line=145 (Compiled frame)
 -
org.apache.spark.util.JsonProtocol$.taskMetricsFromJson(org.json4s.JsonAST$JValue)
@bci=414, line=670 (Compiled frame)
 -
org.apache.spark.util.JsonProtocol$.taskEndFromJson(org.json4s.JsonAST$JValue)
@bci=174, line=508 (Compiled frame)
 -
org.apache.spark.util.JsonProtocol$.sparkEventFromJson(org.json4s.JsonAST$JValue)
@bci=389, line=464 (Compiled frame)
 -
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$1.apply(java.lang.String)
@bci=34, line=51 (Compiled frame)
 -
org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$1.apply(java.lang.Object)
@bci=5, line=49 (Compiled frame)
 - scala.collection.Iterator$class.foreach(scala.collection.Iterator,
scala.Function1) @bci=16, line=743 (Compiled frame)
 - scala.collection.AbstractIterator.foreach(scala.Function1) @bci=2,
line=1177 (Inte

Re: history server

2015-05-07 Thread Marcelo Vanzin
That shouldn't be true in 1.3 (see SPARK-5522).

On Thu, May 7, 2015 at 11:33 AM, Shixiong Zhu  wrote:

> The history server may need several hours to start if you have a lot of
> event logs. Is it stuck, or still replaying logs?
>
> Best Regards,
> Shixiong Zhu
>
> 2015-05-07 11:03 GMT-07:00 Marcelo Vanzin :
>
> Can you get a jstack for the process? Maybe it's stuck somewhere.
>>
>> On Thu, May 7, 2015 at 11:00 AM, Koert Kuipers  wrote:
>>
>>> i am trying to launch the spark 1.3.1 history server on a secure cluster.
>>>
>>> i can see in the logs that it successfully logs into kerberos, and it is
>>> replaying all the logs, but i never see the log message that indicate the
>>> web server is started (i should see something like "Successfully started
>>> service on port 18080." or "Started HistoryServer at
>>> http://somehost:18080";). yet the daemon stays alive...
>>>
>>> any idea why the history server would never start the web service?
>>>
>>> thanks!
>>>
>>>
>>
>>
>> --
>> Marcelo
>>
>
>


-- 
Marcelo


Re: history server

2015-05-07 Thread Shixiong Zhu
The history server may need several hours to start if you have a lot of
event logs. Is it stuck, or still replaying logs?

Best Regards,
Shixiong Zhu

2015-05-07 11:03 GMT-07:00 Marcelo Vanzin :

> Can you get a jstack for the process? Maybe it's stuck somewhere.
>
> On Thu, May 7, 2015 at 11:00 AM, Koert Kuipers  wrote:
>
>> i am trying to launch the spark 1.3.1 history server on a secure cluster.
>>
>> i can see in the logs that it successfully logs into kerberos, and it is
>> replaying all the logs, but i never see the log message that indicate the
>> web server is started (i should see something like "Successfully started
>> service on port 18080." or "Started HistoryServer at
>> http://somehost:18080";). yet the daemon stays alive...
>>
>> any idea why the history server would never start the web service?
>>
>> thanks!
>>
>>
>
>
> --
> Marcelo
>


Re: history server

2015-05-07 Thread Marcelo Vanzin
Can you get a jstack for the process? Maybe it's stuck somewhere.

On Thu, May 7, 2015 at 11:00 AM, Koert Kuipers  wrote:

> i am trying to launch the spark 1.3.1 history server on a secure cluster.
>
> i can see in the logs that it successfully logs into kerberos, and it is
> replaying all the logs, but i never see the log message that indicate the
> web server is started (i should see something like "Successfully started
> service on port 18080." or "Started HistoryServer at http://somehost:18080";).
> yet the daemon stays alive...
>
> any idea why the history server would never start the web service?
>
> thanks!
>
>


-- 
Marcelo


history server

2015-05-07 Thread Koert Kuipers
i am trying to launch the spark 1.3.1 history server on a secure cluster.

i can see in the logs that it successfully logs into kerberos, and it is
replaying all the logs, but i never see the log message that indicate the
web server is started (i should see something like "Successfully started
service on port 18080." or "Started HistoryServer at http://somehost:18080";).
yet the daemon stays alive...

any idea why the history server would never start the web service?

thanks!


Re: Map one RDD into two RDD

2015-05-07 Thread Bill Q
The multi-threading code in Scala is quite simple and you can google it
pretty easily. We used the Future framework. You can use Akka also.

@Evo My concerns for filtering solution are: 1. Will rdd2.filter run before
rdd1.filter finish? 2. We have to traverse rdd twice. Any comments?


On Thursday, May 7, 2015, Evo Eftimov  wrote:

> Scala is a language, Spark is an OO/Functional, Distributed Framework
> facilitating Parallel Programming in a distributed environment
>
>
>
> Any “Scala parallelism” occurs within the Parallel Model imposed by the
> Spark OO Framework – ie it is limited in terms of what it can achieve in
> terms of influencing the Spark Framework behavior – that is the nature of
> programming with/for frameworks
>
>
>
> When RDD1 and RDD2 are partitioned and different Actions applied to them
> this will result in Parallel Pipelines / DAGs within the Spark Framework
>
> RDD1 = RDD.filter()
>
> RDD2 = RDD.filter()
>
>
>
>
>
> *From:* Bill Q [mailto:bill.q@gmail.com
> ]
> *Sent:* Thursday, May 7, 2015 4:55 PM
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> 
> *Subject:* Re: Map one RDD into two RDD
>
>
>
> Thanks for the replies. We decided to use concurrency in Scala to do the
> two mappings using the same source RDD in parallel. So far, it seems to be
> working. Any comments?
>
> On Wednesday, May 6, 2015, Evo Eftimov  > wrote:
>
> RDD1 = RDD.filter()
>
> RDD2 = RDD.filter()
>
>
>
> *From:* Bill Q [mailto:bill.q@gmail.com]
> *Sent:* Tuesday, May 5, 2015 10:42 PM
> *To:* user@spark.apache.org
> *Subject:* Map one RDD into two RDD
>
>
>
> Hi all,
>
> I have a large RDD that I map a function to it. Based on the nature of
> each record in the input RDD, I will generate two types of data. I would
> like to save each type into its own RDD. But I can't seem to find an
> efficient way to do it. Any suggestions?
>
>
>
> Many thanks.
>
>
>
>
>
> Bill
>
>
>
> --
>
> Many thanks.
>
> Bill
>
>
>
>
>
> --
>
> Many thanks.
>
> Bill
>
>
>


-- 
Many thanks.


Bill


Re: Partition Case Class RDD without ParRDDFunctions

2015-05-07 Thread Jonathan Coveney
what about .groupBy doesn't work for you?

2015-05-07 8:17 GMT-04:00 Night Wolf :

> MyClass is a basic scala case class (using Spark 1.3.1);
>
> case class Result(crn: Long, pid: Int, promoWk: Int, windowKey: Int, ipi: 
> Double) {
>   override def hashCode(): Int = crn.hashCode()
> }
>
>
> On Wed, May 6, 2015 at 8:09 PM, ayan guha  wrote:
>
>> How does your MyClqss looks like? I was experimenting with Row class in
>> python and apparently partitionby automatically takes first column as key.
>> However, I am not sure how you can access a part of an object without
>> deserializing it (either explicitly or Spark doing it for you)
>>
>> On Wed, May 6, 2015 at 7:14 PM, Night Wolf 
>> wrote:
>>
>>> Hi,
>>>
>>> If I have an RDD[MyClass] and I want to partition it by the hash code of
>>> MyClass for performance reasons, is there any way to do this without
>>> converting it into a PairRDD RDD[(K,V)] and calling partitionBy???
>>>
>>> Mapping it to a tuple2 seems like a waste of space/computation.
>>>
>>> It looks like the PairRDDFunctions..partitionBy() uses a
>>> ShuffleRDD[K,V,C] requires K,V,C? Could I create a new
>>> ShuffleRDD[MyClass,MyClass,MyClass](caseClassRdd, new HashParitioner)?
>>>
>>> Cheers,
>>> N
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


RE: Sort (order by) of the big dataset

2015-05-07 Thread Ulanov, Alexander
avulanov.blogspot.com, though it does not have more on this particular issue 
than I already posted.

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Thursday, May 07, 2015 6:25 AM
To: Ulanov, Alexander
Cc: user@spark.apache.org
Subject: Re: Sort (order by) of the big dataset

Where can I find your blog ?

Thanks


On Apr 29, 2015, at 7:14 PM, Ulanov, Alexander 
mailto:alexander.ula...@hp.com>> wrote:
After day of debugging (actually, more), I can answer my question:
The problem is that the default value 200 of “spark.sql.shuffle.partitions” is 
too small for sorting 2B rows. It was hard to realize because Spark executors 
just crash with various exceptions one by one. The other takeaway is that 
Dataframe “order by” and RDD.sortBy are implemented in different ways. BTW., 
why?

Small synthetic test (copied from my blog):
Create 2B rows of MyRecord within 2000 partitions, so each partition will have 
1M of rows.
import sqlContext.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class MyRecord(time: Double, id: String)
val rdd = sc.parallelize(1 to 200, 200).flatMap(x => 
Seq.fill(1000)(MyRecord(util.Random.nextDouble, "xxx")))

Lets sort this RDD by time:
val sorted = rdd.sortBy(x => x.time)
result.count

It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. You 
can also check tasks that were completed in Spark web UI. The number of 
reducers was equal to the number of partitions, i.e. 2000

Lets convert the original RDD to Dataframe and sort again:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("data")
val result = sqlContext.sql("select * from data order by time")
result.count

It will run for a while and then crash. If you check tasks in the Spark Web UI, 
you will see that some of them were cancelled due to lost executors 
(ExecutorLost) due to some strange Exceptions. It is really hard to trace back 
which executor was first to be lost. The other follow it as in house of cards. 
What's the problem? The number of reducers. For the first task it is equal to 
the number of partitions, i.e. 2000, but for the second it switched to 200.

From: Ulanov, Alexander
Sent: Wednesday, April 29, 2015 1:08 PM
To: user@spark.apache.org
Subject: Sort (order by) of the big dataset

Hi,

I have a 2 billion records dataset witch schema . It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 
1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 
workers with 3GB memory.

I keep failing to sort the mentioned dataset in Spark. I do the following:
val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”)
pf.registerTempTable(“data”)
val sorted = sqlContext.sql(“select * from data order by time”)
sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”)

Spark starts to execute tasks and then errors like “Exector Lost” pop up in the 
web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), 
giving different types of Exceptions in explanation. My thinking is that the 
main problem is with “GC overhead limit” exception however I observe exceptions 
related to connection time out and shuffling write 
(java.lang.IllegalStateException: Shutdown in progress; 
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ).

What I tried:
1)Tried to “order by eventId” and by both “order by eventId, time” with the 
same result.
2)Looked at the shuffle parameters but the default do make sense.
3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
pf.repartition(3000) in order to get smaller chunks of data passed to executors 
(originally there are 300 partitions). It did not help either. Surprisingly 
this dataset takes 50GB on hdfs versus 23GB that took the original.
4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not 
help.

Could you suggest what might be the problem and what is the workaround? Just in 
case, I cannot have more RAM or more machines ☺

Best  regards, Alexander


RE: Sort (order by) of the big dataset

2015-05-07 Thread Ulanov, Alexander
The answer for Spark SQL “order by” is setting spark.sql.shuffle.partitions to 
a bigger number. For RDD.sortBy it works out of the box if RDD has enough 
number of partitions.

From: Night Wolf [mailto:nightwolf...@gmail.com]
Sent: Thursday, May 07, 2015 5:26 AM
To: Ulanov, Alexander
Cc: user@spark.apache.org
Subject: Re: Sort (order by) of the big dataset

What was the answer, was it only setting spark.sql.shuffle.partitions?

On Thu, Apr 30, 2015 at 12:14 PM, Ulanov, Alexander 
mailto:alexander.ula...@hp.com>> wrote:
After day of debugging (actually, more), I can answer my question:
The problem is that the default value 200 of “spark.sql.shuffle.partitions” is 
too small for sorting 2B rows. It was hard to realize because Spark executors 
just crash with various exceptions one by one. The other takeaway is that 
Dataframe “order by” and RDD.sortBy are implemented in different ways. BTW., 
why?

Small synthetic test (copied from my blog):
Create 2B rows of MyRecord within 2000 partitions, so each partition will have 
1M of rows.
import sqlContext.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
case class MyRecord(time: Double, id: String)
val rdd = sc.parallelize(1 to 200, 200).flatMap(x => 
Seq.fill(1000)(MyRecord(util.Random.nextDouble, "xxx")))

Lets sort this RDD by time:
val sorted = rdd.sortBy(x => x.time)
result.count

It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. You 
can also check tasks that were completed in Spark web UI. The number of 
reducers was equal to the number of partitions, i.e. 2000

Lets convert the original RDD to Dataframe and sort again:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("data")
val result = sqlContext.sql("select * from data order by time")
result.count

It will run for a while and then crash. If you check tasks in the Spark Web UI, 
you will see that some of them were cancelled due to lost executors 
(ExecutorLost) due to some strange Exceptions. It is really hard to trace back 
which executor was first to be lost. The other follow it as in house of cards. 
What's the problem? The number of reducers. For the first task it is equal to 
the number of partitions, i.e. 2000, but for the second it switched to 200.

From: Ulanov, Alexander
Sent: Wednesday, April 29, 2015 1:08 PM
To: user@spark.apache.org
Subject: Sort (order by) of the big dataset

Hi,

I have a 2 billion records dataset witch schema . It is stored in Parquet format in HDFS, size 23GB. Specs: Spark 
1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, each node has 3 
workers with 3GB memory.

I keep failing to sort the mentioned dataset in Spark. I do the following:
val pf = 
sqlContext.parquetFile(“hdfs://my.net/data.parquet”)
pf.registerTempTable(“data”)
val sorted = sqlContext.sql(“select * from data order by time”)
sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”)

Spark starts to execute tasks and then errors like “Exector Lost” pop up in the 
web UI (task mapPartitions at Exchange.scala and runJob at newParquet.scala), 
giving different types of Exceptions in explanation. My thinking is that the 
main problem is with “GC overhead limit” exception however I observe exceptions 
related to connection time out and shuffling write 
(java.lang.IllegalStateException: Shutdown in progress; 
org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException ).

What I tried:
1)Tried to “order by eventId” and by both “order by eventId, time” with the 
same result.
2)Looked at the shuffle parameters but the default do make sense.
3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
pf.repartition(3000) in order to get smaller chunks of data passed to executors 
(originally there are 300 partitions). It did not help either. Surprisingly 
this dataset takes 50GB on hdfs versus 23GB that took the original.
4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did not 
help.

Could you suggest what might be the problem and what is the workaround? Just in 
case, I cannot have more RAM or more machines ☺

Best  regards, Alexander



RE: Map one RDD into two RDD

2015-05-07 Thread Evo Eftimov
Scala is a language, Spark is an OO/Functional, Distributed Framework 
facilitating Parallel Programming in a distributed environment 

 

Any “Scala parallelism” occurs within the Parallel Model imposed by the Spark 
OO Framework – ie it is limited in terms of what it can achieve in terms of 
influencing the Spark Framework behavior – that is the nature of programming 
with/for frameworks 

 

When RDD1 and RDD2 are partitioned and different Actions applied to them this 
will result in Parallel Pipelines / DAGs within the Spark Framework

RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

 

From: Bill Q [mailto:bill.q@gmail.com] 
Sent: Thursday, May 7, 2015 4:55 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Map one RDD into two RDD

 

Thanks for the replies. We decided to use concurrency in Scala to do the two 
mappings using the same source RDD in parallel. So far, it seems to be working. 
Any comments?

On Wednesday, May 6, 2015, Evo Eftimov  wrote:

RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

From: Bill Q [mailto:bill.q@gmail.com 
 ] 
Sent: Tuesday, May 5, 2015 10:42 PM
To: user@spark.apache.org 
 
Subject: Map one RDD into two RDD

 

Hi all,

I have a large RDD that I map a function to it. Based on the nature of each 
record in the input RDD, I will generate two types of data. I would like to 
save each type into its own RDD. But I can't seem to find an efficient way to 
do it. Any suggestions?

 

Many thanks.

 

 

Bill



-- 

Many thanks.

Bill

 



-- 

Many thanks.



Bill

 



Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-05-07 Thread Ravi Mody
After thinking about it more, I do think weighting lambda by sum_i cij is
the equivalent of the ALS-WR paper's approach for the implicit case. This
provides scale-invariance for varying products/users and for varying ratings,
and should behave well for all alphas. What do you guys think?

On Wed, May 6, 2015 at 12:29 PM, Ravi Mody  wrote:

> Whoops I just saw this thread, it got caught in my spam filter. Thanks for
> looking into this Xiangrui and Sean.
>
> The implicit situation does seem fairly complicated to me. The cost
> function (not including the regularization term) is affected both by the
> number of ratings and by the number of user/products. As we increase alpha
> the contribution to the cost function from the number of users/products
> diminishes compared to the contribution from the number of ratings. So
> large alphas seem to favor the weighted-lambda approach, even though it's
> not a perfect match. Smaller alphas favor Xiangrui's 1.3.0 approach, but
> again it's not a perfect match.
>
> I believe low alphas won't work well with regularization because both
> terms in the cost function will just push everything to zero. Some of my
> experiments confirm this. This leads me to think that weighted-lambda would
> work better in practice, but I have no evidence of this. It may make sense
> to weight lambda by sum_i cij instead?
>
>
>
>
>
> On Wed, Apr 1, 2015 at 7:59 PM, Xiangrui Meng  wrote:
>
>> Ravi, we just merged https://issues.apache.org/jira/browse/SPARK-6642
>> and used the same lambda scaling as in 1.2. The change will be
>> included in Spark 1.3.1, which will be released soon. Thanks for
>> reporting this issue! -Xiangrui
>>
>> On Tue, Mar 31, 2015 at 8:53 PM, Xiangrui Meng  wrote:
>> > I created a JIRA for this:
>> > https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have
>> > a clear answer about how the scaling should be handled. Maybe the best
>> > solution for now is to switch back to the 1.2 scaling. -Xiangrui
>> >
>> > On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen  wrote:
>> >> Ah yeah I take your point. The squared error term is over the whole
>> >> user-item matrix, technically, in the implicit case. I suppose I am
>> >> used to assuming that the 0 terms in this matrix are weighted so much
>> >> less (because alpha is usually large-ish) that they're almost not
>> >> there, but they are. So I had just used the explicit formulation.
>> >>
>> >> I suppose the result is kind of scale invariant, but not exactly. I
>> >> had not prioritized this property since I had generally built models
>> >> on the full data set and not a sample, and had assumed that lambda
>> >> would need to be retuned over time as the input grew anyway.
>> >>
>> >> So, basically I don't know anything more than you do, sorry!
>> >>
>> >> On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng 
>> wrote:
>> >>> Hey Sean,
>> >>>
>> >>> That is true for explicit model, but not for implicit. The ALS-WR
>> >>> paper doesn't cover the implicit model. In implicit formulation, a
>> >>> sub-problem (for v_j) is:
>> >>>
>> >>> min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2
>> >>>
>> >>> This is a sum for all i but not just the users who rate item j. In
>> >>> this case, if we set X=m_j, the number of observed ratings for item j,
>> >>> it is not really scale invariant. We have #users user vectors in the
>> >>> least squares problem but only penalize lambda * #ratings. I was
>> >>> suggesting using lambda * m directly for implicit model to match the
>> >>> number of vectors in the least squares problem. Well, this is my
>> >>> theory. I don't find any public work about it.
>> >>>
>> >>> Best,
>> >>> Xiangrui
>> >>>
>> >>> On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen 
>> wrote:
>>  I had always understood the formulation to be the first option you
>>  describe. Lambda is scaled by the number of items the user has rated
>> /
>>  interacted with. I think the goal is to avoid fitting the tastes of
>>  prolific users disproportionately just because they have many ratings
>>  to fit. This is what's described in the ALS-WR paper we link to on
>> the
>>  Spark web site, in equation 5
>>  (
>> http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf
>> )
>> 
>>  I think this also gets you the scale-invariance? For every additional
>>  rating from user i to product j, you add one new term to the
>>  squared-error sum, (r_ij - u_i . m_j)^2, but also, you'd increase the
>>  regularization term by lambda * (|u_i|^2 + |m_j|^2)  They are at
>> least
>>  both increasing about linearly as ratings increase. If the
>>  regularization term is multiplied by the total number of users and
>>  products in the model, then it's fixed.
>> 
>>  I might misunderstand you and/or be speaking about something slightly
>>  different when it comes to invariance. But FWIW I had always
>>  understood the regularization to

Re: Map one RDD into two RDD

2015-05-07 Thread Gerard Maas
Hi Bill,

Could you show a snippet of code to illustrate your choice?

-Gerard.

On Thu, May 7, 2015 at 5:55 PM, Bill Q  wrote:

> Thanks for the replies. We decided to use concurrency in Scala to do the
> two mappings using the same source RDD in parallel. So far, it seems to be
> working. Any comments?
>
>
> On Wednesday, May 6, 2015, Evo Eftimov  wrote:
>
>> RDD1 = RDD.filter()
>>
>> RDD2 = RDD.filter()
>>
>>
>>
>> *From:* Bill Q [mailto:bill.q@gmail.com]
>> *Sent:* Tuesday, May 5, 2015 10:42 PM
>> *To:* user@spark.apache.org
>> *Subject:* Map one RDD into two RDD
>>
>>
>>
>> Hi all,
>>
>> I have a large RDD that I map a function to it. Based on the nature of
>> each record in the input RDD, I will generate two types of data. I would
>> like to save each type into its own RDD. But I can't seem to find an
>> efficient way to do it. Any suggestions?
>>
>>
>>
>> Many thanks.
>>
>>
>>
>>
>>
>> Bill
>>
>>
>>
>> --
>>
>> Many thanks.
>>
>> Bill
>>
>>
>>
>
>
> --
> Many thanks.
>
>
> Bill
>
>


Re: branch-1.4 scala 2.11

2015-05-07 Thread Iulian Dragoș
There's an open PR to fix it: https://github.com/apache/spark/pull/5966

On Thu, May 7, 2015 at 6:07 PM, Koert Kuipers  wrote:

> i am having no luck using the 1.4 branch with scala 2.11
>
> $ build/mvn -DskipTests -Pyarn -Dscala-2.11 -Pscala-2.11 clean package
>
> [error]
> /home/koert/src/opensource/spark/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala:78:
> in object RDDOperationScope, multiple overloaded alternatives of method
> withScope define default arguments.
> [error] private[spark] object RDDOperationScope {
> [error]
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


branch-1.4 scala 2.11

2015-05-07 Thread Koert Kuipers
i am having no luck using the 1.4 branch with scala 2.11

$ build/mvn -DskipTests -Pyarn -Dscala-2.11 -Pscala-2.11 clean package

[error]
/home/koert/src/opensource/spark/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala:78:
in object RDDOperationScope, multiple overloaded alternatives of method
withScope define default arguments.
[error] private[spark] object RDDOperationScope {
[error]


CompositeInputFormat implementation in Spark

2015-05-07 Thread Bill Q
Hi,
We are trying to join two sets of data. One of them are smaller and pretty
stable. The other data set is volatile and much larger. But neither can be
loaded in memory.

So our idea is to pre-sort the smaller data set, cache them in multiple
partitions. Any we use the same logic to sort the larger data set and
partition them. So in theory, the same key range records will be in the
same node after the above steps are done. And we them join them together.
By doing so, we only need to sort and partition the larger data set every
time it gets in.

The question is: is there an implementation for this in Spark already? I
know this can be done in Hadoop M/R using CompositeInputFormat. But not
sure if Spark has the counterpart already existed.

Many thanks.


Bill


-- 
Many thanks.


Bill


Re: Map one RDD into two RDD

2015-05-07 Thread Bill Q
Thanks for the replies. We decided to use concurrency in Scala to do the
two mappings using the same source RDD in parallel. So far, it seems to be
working. Any comments?

On Wednesday, May 6, 2015, Evo Eftimov  wrote:

> RDD1 = RDD.filter()
>
> RDD2 = RDD.filter()
>
>
>
> *From:* Bill Q [mailto:bill.q@gmail.com
> ]
> *Sent:* Tuesday, May 5, 2015 10:42 PM
> *To:* user@spark.apache.org
> 
> *Subject:* Map one RDD into two RDD
>
>
>
> Hi all,
>
> I have a large RDD that I map a function to it. Based on the nature of
> each record in the input RDD, I will generate two types of data. I would
> like to save each type into its own RDD. But I can't seem to find an
> efficient way to do it. Any suggestions?
>
>
>
> Many thanks.
>
>
>
>
>
> Bill
>
>
>
> --
>
> Many thanks.
>
> Bill
>
>
>


-- 
Many thanks.


Bill


Re: Avro to Parquet ?

2015-05-07 Thread Jonathan Coveney
A helpful example of how to convert:
http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/

As far as performance, that depends on your data. If you have a lot of
columns and use all of them, parquet deserialization is expensive. If you
have a column and only need a few (or have some filters you can push down),
the savings can be huge.

2015-05-07 11:29 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) :

> 1) What is the best way to convert data from Avro to Parquet so that it
> can be later read and processed ?
>
> 2) Will the performance of processing (join, reduceByKey) be better if
> both datasets are in Parquet format when compared to Avro + Sequence ?
>
> --
> Deepak
>
>


RE: saveAsTable fails on Python with "Unresolved plan found"

2015-05-07 Thread Judy Nash
Figured it out. It was because I was using HiveContext instead of SQLContext.
FYI in case others saw the same issue.

From: Judy Nash
Sent: Thursday, May 7, 2015 7:38 AM
To: 'user@spark.apache.org'
Subject: RE: saveAsTable fails on Python with "Unresolved plan found"

SPARK-4825 looks like the 
right bug, but it should've been fixed on 1.2.1.

Is a similar fix needed in Python?

From: Judy Nash
Sent: Thursday, May 7, 2015 7:26 AM
To: user@spark.apache.org
Subject: saveAsTable fails on Python with "Unresolved plan found"

Hello,

I am following the tutorial code on sql programming 
guide
 to try out Python on spark 1.2.1.

SaveAsTable function works on Scala bur fails on python with "Unresolved plan 
found".

Broken Python code:

from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

lines = sc.textFile("data.txt")

parts = lines.map(lambda l: l.split(","))

people = parts.map(lambda p: Row(id=p[0], name=p[1]))

schemaPeople = sqlContext.inferSchema(people)

schemaPeople.saveAsTable("peopletable")

saveAsTable fails with Unresolved plan found.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan 
found, tree:
'CreateTableAsSelect None, pytable, false, None


This scala code works fine:

from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

lines = sc.textFile("data.txt")

parts = lines.map(lambda l: l.split(","))

people = parts.map(lambda p: Row(id=p[0], name=p[1]))

schemaPeople = sqlContext.inferSchema(people)

schemaPeople.saveAsTable("peopletable")


Is this a known issue? Or am I not using Python correctly?

Thanks,
Judy


RandomSplit with Spark-ML and Dataframe

2015-05-07 Thread Olivier Girardot
Hi,
is there any best practice to do like in MLLib a randomSplit of
training/cross-validation set with dataframes and the pipeline API ?

Regards

Olivier.


Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Richard Marscher
I should also add I've recently seen this issue as well when using collect.
I believe in my case it was related to heap space on the driver program not
being able to handle the returned collection.

On Thu, May 7, 2015 at 11:05 AM, Richard Marscher 
wrote:

> By default you would expect to find the logs files for master and workers
> in the relative `logs` directory from the root of the Spark installation on
> each of the respective nodes in the cluster.
>
> On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>>  Ø  Can you check your local and remote logs?
>>
>>
>>
>> Where are the log files? I see the following in my Driver program logs as
>> well as the Spark UI failed task page
>>
>>
>>
>> java.io.IOException: org.apache.spark.SparkException: Failed to get
>> broadcast_2_piece0 of broadcast_2
>>
>>
>>
>> Here is the detailed stack trace.
>>
>> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
>> org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0 of broadcast_2
>>
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>>
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>>
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>>
>>
>>
>> Ningjun
>>
>>
>>
>> *From:* Jonathan Coveney [mailto:jcove...@gmail.com]
>> *Sent:* Wednesday, May 06, 2015 5:23 PM
>> *To:* Wang, Ningjun (LNG-NPV)
>> *Cc:* Ted Yu; user@spark.apache.org
>>
>> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0
>>
>>
>>
>> Can you check your local and remote logs?
>>
>>
>>
>> 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) <
>> ningjun.w...@lexisnexis.com>:
>>
>> This problem happen in Spark 1.3.1.  It happen when two jobs are running
>> simultaneously each in its own Spark Context.
>>
>>
>>
>> I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
>> introduced in Spark 1.3.1?
>>
>>
>>
>> Ningjun
>>
>>
>>
>> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
>> *Sent:* Wednesday, May 06, 2015 11:32 AM
>> *To:* Wang, Ningjun (LNG-NPV)
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0
>>
>>
>>
>> Which release of Spark are you using ?
>>
>>
>>
>> Thanks
>>
>>
>> On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) <
>> ningjun.w...@lexisnexis.com> wrote:
>>
>>  I run a job on spark standalone cluster and got the exception below
>>
>>
>>
>> Here is the line of code that cause problem
>>
>>
>>
>> *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
>> cattegory, path) *
>>
>>
>> myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)
>>
>> *val *cats: Array[String] = myRdd.map(t => t._2).distinct().collect()
>> // This line cause the exception
>>
>>
>>
>>
>>
>> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
>> org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0 of broadcast_2
>>
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>>
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>>
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>
>> at
>> org.apache.spark.execut

Avro to Parquet ?

2015-05-07 Thread ๏̯͡๏
1) What is the best way to convert data from Avro to Parquet so that it can
be later read and processed ?

2) Will the performance of processing (join, reduceByKey) be better if both
datasets are in Parquet format when compared to Avro + Sequence ?

-- 
Deepak


Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Richard Marscher
By default you would expect to find the logs files for master and workers
in the relative `logs` directory from the root of the Spark installation on
each of the respective nodes in the cluster.

On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  Ø  Can you check your local and remote logs?
>
>
>
> Where are the log files? I see the following in my Driver program logs as
> well as the Spark UI failed task page
>
>
>
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_2_piece0 of broadcast_2
>
>
>
> Here is the detailed stack trace.
>
> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
> org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0 of broadcast_2
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>
>
> Ningjun
>
>
>
> *From:* Jonathan Coveney [mailto:jcove...@gmail.com]
> *Sent:* Wednesday, May 06, 2015 5:23 PM
> *To:* Wang, Ningjun (LNG-NPV)
> *Cc:* Ted Yu; user@spark.apache.org
>
> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0
>
>
>
> Can you check your local and remote logs?
>
>
>
> 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com>:
>
> This problem happen in Spark 1.3.1.  It happen when two jobs are running
> simultaneously each in its own Spark Context.
>
>
>
> I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
> introduced in Spark 1.3.1?
>
>
>
> Ningjun
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Wednesday, May 06, 2015 11:32 AM
> *To:* Wang, Ningjun (LNG-NPV)
> *Cc:* user@spark.apache.org
> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0
>
>
>
> Which release of Spark are you using ?
>
>
>
> Thanks
>
>
> On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>  I run a job on spark standalone cluster and got the exception below
>
>
>
> Here is the line of code that cause problem
>
>
>
> *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
> cattegory, path) *
>
>
> myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)
>
> *val *cats: Array[String] = myRdd.map(t => t._2).distinct().collect()  //
> This line cause the exception
>
>
>
>
>
> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
> org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0 of broadcast_2
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_2_piece0 of broadcast_2
>
> at
> org.apache.spark.broadcast.T

Re: Spark Job triggers second attempt

2015-05-07 Thread Richard Marscher
Hi,

I think you may want to use this setting?:

spark.task.maxFailures4Number of individual task failures before giving up
on the job. Should be greater than or equal to 1. Number of allowed retries
= this value - 1.

On Thu, May 7, 2015 at 2:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> How i can stop Spark to stop triggering second attempt in case the first
> fails.
> I do not want to wait for the second attempt to fail again so that i can
> debug faster.
>
> .set("spark.yarn.maxAppAttempts", "0") OR .set("spark.yarn.maxAppAttempts",
> "1")
>
> is not helping.
>
> --
> Deepak
>
>


Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread Yana Kadiyska
Here is the JIRA:  https://issues.apache.org/jira/browse/SPARK-3928
Looks like for now you'd have to list the full paths...I don't see a
comment from an official spark committer so still not sure if this is a bug
or design, but it seems to be the current state of affairs.

On Thu, May 7, 2015 at 8:43 AM, yana  wrote:

> I believe this is a regression. Does not work for me either. There is a
> Jira on parquet wildcards which is resolved, I'll see about getting it
> reopened
>
>
> Sent on the new Sprint Network from my Samsung Galaxy S®4.
>
>
>  Original message 
> From: Vaxuki
> Date:05/07/2015 7:38 AM (GMT-05:00)
> To: Olivier Girardot
> Cc: user@spark.apache.org
> Subject: Re: Spark 1.3.1 and Parquet Partitions
>
> Olivier
> Nope. Wildcard extensions don't work I am debugging the code to figure out
> what's wrong I know I am using 1.3.1 for sure
>
> Pardon typos...
>
> On May 7, 2015, at 7:06 AM, Olivier Girardot  wrote:
>
> "hdfs://some ip:8029/dataset/*/*.parquet" doesn't work for you ?
>
> Le jeu. 7 mai 2015 à 03:32, vasuki  a écrit :
>
>> Spark 1.3.1 -
>> i have a parquet file on hdfs partitioned by some string looking like this
>> /dataset/city=London/data.parquet
>> /dataset/city=NewYork/data.parquet
>> /dataset/city=Paris/data.paruqet
>> ….
>>
>> I am trying to get to load it using sqlContext using
>> sqlcontext.parquetFile(
>> "hdfs://some ip:8029/dataset/< what do i put here >
>>
>> No leads so far. is there i can load the partitions ? I am running on
>> cluster and not local..
>> -V
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


RE: saveAsTable fails on Python with "Unresolved plan found"

2015-05-07 Thread Judy Nash
SPARK-4825 looks like the 
right bug, but it should've been fixed on 1.2.1.

Is a similar fix needed in Python?

From: Judy Nash
Sent: Thursday, May 7, 2015 7:26 AM
To: user@spark.apache.org
Subject: saveAsTable fails on Python with "Unresolved plan found"

Hello,

I am following the tutorial code on sql programming 
guide
 to try out Python on spark 1.2.1.

SaveAsTable function works on Scala bur fails on python with "Unresolved plan 
found".

Broken Python code:

from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

lines = sc.textFile("data.txt")

parts = lines.map(lambda l: l.split(","))

people = parts.map(lambda p: Row(id=p[0], name=p[1]))

schemaPeople = sqlContext.inferSchema(people)

schemaPeople.saveAsTable("peopletable")

saveAsTable fails with Unresolved plan found.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan 
found, tree:
'CreateTableAsSelect None, pytable, false, None


This scala code works fine:

from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

lines = sc.textFile("data.txt")

parts = lines.map(lambda l: l.split(","))

people = parts.map(lambda p: Row(id=p[0], name=p[1]))

schemaPeople = sqlContext.inferSchema(people)

schemaPeople.saveAsTable("peopletable")


Is this a known issue? Or am I not using Python correctly?

Thanks,
Judy


RE: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Wang, Ningjun (LNG-NPV)
Ø  Can you check your local and remote logs?

Where are the log files? I see the following in my Driver program logs as well 
as the Spark UI failed task page

java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_2_piece0 of broadcast_2

Here is the detailed stack trace.
15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
LAB4-WIN03.pcc.lexisnexis.com): 
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_2_piece0 of broadcast_2
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



Ningjun

From: Jonathan Coveney [mailto:jcove...@gmail.com]
Sent: Wednesday, May 06, 2015 5:23 PM
To: Wang, Ningjun (LNG-NPV)
Cc: Ted Yu; user@spark.apache.org
Subject: Re: java.io.IOException: org.apache.spark.SparkException: Failed to 
get broadcast_2_piece0

Can you check your local and remote logs?

2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) 
mailto:ningjun.w...@lexisnexis.com>>:
This problem happen in Spark 1.3.1.  It happen when two jobs are running 
simultaneously each in its own Spark Context.

I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug introduced in 
Spark 1.3.1?

Ningjun

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, May 06, 2015 11:32 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: java.io.IOException: org.apache.spark.SparkException: Failed to 
get broadcast_2_piece0

Which release of Spark are you using ?

Thanks

On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) 
mailto:ningjun.w...@lexisnexis.com>> wrote:
I run a job on spark standalone cluster and got the exception below

Here is the line of code that cause problem

val myRdd: RDD[(String, String, String)] = … // RDD of (docid, cattegory, path)

myRdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

val cats: Array[String] = myRdd.map(t => t._2).distinct().collect()  // This 
line cause the exception


15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
LAB4-WIN03.pcc.lexisnexis.com): 
java.io.IOException: org.apache.spark.SparkException: Failed to get 
broadcast_2_piece0 of broadcast_2
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_2_piece0 of 
broadcast_2
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
oadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
oadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBr
oadcast$$readBlocks$1.apply$mcVI$sp(T

saveAsTable fails on Python with "Unresolved plan found"

2015-05-07 Thread Judy Nash
Hello,

I am following the tutorial code on sql programming 
guide
 to try out Python on spark 1.2.1.

SaveAsTable function works on Scala bur fails on python with "Unresolved plan 
found".

Broken Python code:

from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

lines = sc.textFile("data.txt")

parts = lines.map(lambda l: l.split(","))

people = parts.map(lambda p: Row(id=p[0], name=p[1]))

schemaPeople = sqlContext.inferSchema(people)

schemaPeople.saveAsTable("peopletable")

saveAsTable fails with Unresolved plan found.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan 
found, tree:
'CreateTableAsSelect None, pytable, false, None


This scala code works fine:

from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

lines = sc.textFile("data.txt")

parts = lines.map(lambda l: l.split(","))

people = parts.map(lambda p: Row(id=p[0], name=p[1]))

schemaPeople = sqlContext.inferSchema(people)

schemaPeople.saveAsTable("peopletable")

Is this a known issue? Or am I not using Python correctly?

Thanks,
Judy


User Defined Type (UDT)

2015-05-07 Thread wjur
Hi all!

I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for
a project I'm working on. I've created a case class Person(name: String) and
now I'm trying to make Spark to be able serialize and deserialize the
defined type. I made a couple of attempts but none of them did not work in
100% (there were issues either in serialization or deserialization). 

This is my class and the corresponding UDT.

@SQLUserDefinedType(udt = classOf[PersonUDT])
case class Person(name: String)

class PersonUDT extends UserDefinedType[Person] {
  override def sqlType: DataType = StructType(Seq(StructField("name",
StringType)))

  override def serialize(obj: Any): Seq[Any] = {
obj match {
  case c: Person =>
Seq(c.name)
}
  }

  override def userClass: Class[Person] = classOf[Person]

  override def deserialize(datum: Any): Person = {
datum match {
  case values: Seq[_] =>
assert(values.length == 1)
Person(values.head.asInstanceOf[String])
  case values: util.ArrayList[_] =>
Person(values.get(0).asInstanceOf[String])
}
  }
  
  // In some other attempt I was creating RDD of Seq with manually
serialized data and 
  // I had to override equals because two DFs with the same type weren't
actually equal
  // StructField(person,...types.PersonUDT@a096ac3)
  // StructField(person,...types.PersonUDT@613fd937)
  def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
 
  override def equals(other: Any): Boolean = other match {
case that: PersonUDT => true
case _ => false
  }

  override def hashCode(): Int = 1
}

This is how I create RDD of Person and then try to create a DataFrame
val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
val sparkDataFrame = sqlContext.createDataFrame(rdd)

The second line throws an exception:
java.lang.ClassCastException: types.PersonUDT cannot be cast to
org.apache.spark.sql.types.StructType 
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)

I looked into the code in SQLContext.scala and it seems that the code
requires UDT to be extending StructType but in fact it extends
UserDefinedType which extends directly DataType.
I'm not sure whether it is a bug or I just don't know how to use UDTs.

Do you have any suggestions how to solve this? I based my UDT on
ExamplePointUDT but it seems to be incorrect. Is there a working example for
UDT?


Thank you for the reply in advance!
wjur



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark does not delete temporary directories

2015-05-07 Thread Ted Yu
Default value for spark.worker.cleanup.enabled is false:

private val CLEANUP_ENABLED =
conf.getBoolean("spark.worker.cleanup.enabled", false)

I wonder if the default should be set as true.

Cheers

On Thu, May 7, 2015 at 6:19 AM, Todd Nist  wrote:

> Have you tried to set the following?
>
> spark.worker.cleanup.enabled=true
> spark.worker.cleanup.appDataTtl=”
>
>
>
> On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim 
> wrote:
>
>> Hi,
>>
>>
>>
>> After a spark program completes, there are 3 temporary directories remain
>> in the temp directory.
>>
>> The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7
>>
>>
>>
>> And the Spark program runs on Windows, a snappy DLL file also remains in
>> the temp directory.
>>
>> The file name is like this:
>> snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava
>>
>>
>>
>> They are created every time the Spark program runs. So the number of
>> files and directories keeps growing.
>>
>>
>>
>> How can let them be deleted?
>>
>>
>>
>> Spark version is 1.3.1 with Hadoop 2.6.
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>
>


Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-05-07 Thread allonsy
Has there been any follow up on this topic?

Here    there were suggestions
that someone was going to publish some code, but no news since (TD himself
looked pretty interested).

Did anybody come up with something in the last months?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKeyAndWindow-but-using-log-timestamps-instead-of-clock-seconds-tp21405p22795.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort (order by) of the big dataset

2015-05-07 Thread Ted Yu
Where can I find your blog ?

Thanks



> On Apr 29, 2015, at 7:14 PM, Ulanov, Alexander  
> wrote:
> 
> After day of debugging (actually, more), I can answer my question:
> The problem is that the default value 200 of “spark.sql.shuffle.partitions” 
> is too small for sorting 2B rows. It was hard to realize because Spark 
> executors just crash with various exceptions one by one. The other takeaway 
> is that Dataframe “order by” and RDD.sortBy are implemented in different 
> ways. BTW., why?
>  
> Small synthetic test (copied from my blog):
> Create 2B rows of MyRecord within 2000 partitions, so each partition will 
> have 1M of rows.
> import sqlContext.implicits._
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> case class MyRecord(time: Double, id: String)
> val rdd = sc.parallelize(1 to 200, 200).flatMap(x => 
> Seq.fill(1000)(MyRecord(util.Random.nextDouble, "xxx")))
>  
> Lets sort this RDD by time:
> val sorted = rdd.sortBy(x => x.time)
> result.count
>  
> It finished in about 8 minutes on my cluster of 8 nodes. Everything's fine. 
> You can also check tasks that were completed in Spark web UI. The number of 
> reducers was equal to the number of partitions, i.e. 2000
>  
> Lets convert the original RDD to Dataframe and sort again:
> val df = sqlContext.createDataFrame(rdd)
> df.registerTempTable("data")
> val result = sqlContext.sql("select * from data order by time")
> result.count
>  
> It will run for a while and then crash. If you check tasks in the Spark Web 
> UI, you will see that some of them were cancelled due to lost executors 
> (ExecutorLost) due to some strange Exceptions. It is really hard to trace 
> back which executor was first to be lost. The other follow it as in house of 
> cards. What's the problem? The number of reducers. For the first task it is 
> equal to the number of partitions, i.e. 2000, but for the second it switched 
> to 200.
>  
> From: Ulanov, Alexander 
> Sent: Wednesday, April 29, 2015 1:08 PM
> To: user@spark.apache.org
> Subject: Sort (order by) of the big dataset
>  
> Hi,
>  
> I have a 2 billion records dataset witch schema  Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB. 
> Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space, 
> each node has 3 workers with 3GB memory.
>  
> I keep failing to sort the mentioned dataset in Spark. I do the following:
> val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”)
> pf.registerTempTable(“data”)
> val sorted = sqlContext.sql(“select * from data order by time”)
> sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”)
>  
> Spark starts to execute tasks and then errors like “Exector Lost” pop up in 
> the web UI (task mapPartitions at Exchange.scala and runJob at 
> newParquet.scala), giving different types of Exceptions in explanation. My 
> thinking is that the main problem is with “GC overhead limit” exception 
> however I observe exceptions related to connection time out and shuffling 
> write (java.lang.IllegalStateException: Shutdown in progress; 
> org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException 
> ).
>  
> What I tried:
> 1)Tried to “order by eventId” and by both “order by eventId, time” with the 
> same result.
> 2)Looked at the shuffle parameters but the default do make sense.
> 3)Tried to repartition the data I am loading from Parquet:  val pf3000 = 
> pf.repartition(3000) in order to get smaller chunks of data passed to 
> executors (originally there are 300 partitions). It did not help either. 
> Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the 
> original.
> 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It did 
> not help.
>  
> Could you suggest what might be the problem and what is the workaround? Just 
> in case, I cannot have more RAM or more machines J
>  
> Best  regards, Alexander


Re: Spark does not delete temporary directories

2015-05-07 Thread Todd Nist
Have you tried to set the following?

spark.worker.cleanup.enabled=true
spark.worker.cleanup.appDataTtl=”



On Thu, May 7, 2015 at 2:39 AM, Taeyun Kim 
wrote:

> Hi,
>
>
>
> After a spark program completes, there are 3 temporary directories remain
> in the temp directory.
>
> The file names are like this: spark-2e389487-40cc-4a82-a5c7-353c0feefbb7
>
>
>
> And the Spark program runs on Windows, a snappy DLL file also remains in
> the temp directory.
>
> The file name is like this:
> snappy-1.0.4.1-6e117df4-97b6-4d69-bf9d-71c4a627940c-snappyjava
>
>
>
> They are created every time the Spark program runs. So the number of files
> and directories keeps growing.
>
>
>
> How can let them be deleted?
>
>
>
> Spark version is 1.3.1 with Hadoop 2.6.
>
>
>
> Thanks.
>
>
>
>
>


Re: YARN mode startup takes too long (10+ secs)

2015-05-07 Thread Zoltán Zvara
Without considering everything, just a few hints:
You are running on YARN. From 09:18:34 to 09:18:37 your application is in
state ACCEPTED. There is a noticeable overhead introduced due to
communicating with YARN's ResourceManager, NodeManager and given that the
YARN scheduler needs time to make a decision. I guess somewhere from 09:18:38
to 09:18:43 your application JAR gets copied to another container requested
by the Spark ApplicationMaster deployed on YARN's container 0. Deploying an
executor needs further resource negotiations with the ResourceManager
usually. Also, as I said, your JAR and Executor's code requires copying to
the container's local directory - execution blocked until that is complete.

On Thu, May 7, 2015 at 3:09 AM Taeyun Kim 
wrote:

> Hi,
>
>
>
> I’m running a spark application with YARN-client or YARN-cluster mode.
>
> But it seems to take too long to startup.
>
> It takes 10+ seconds to initialize the spark context.
>
> Is this normal? Or can it be optimized?
>
>
>
> The environment is as follows:
>
> - Hadoop: Hortonworks HDP 2.2 (Hadoop 2.6)
>
> - Spark: 1.3.1
>
> - Client: Windows 7, but similar result on CentOS 6.6
>
>
>
> The following is the startup part of the application log. (Some private
> information was edited)
>
> ‘Main: Initializing context’ at the first line and ‘MainProcessor:
> Deleting previous output files’ at the last line are the logs by the
> application. Others in between are from Spark itself. Application logic is
> executed after this log is displayed.
>
>
>
> ---
>
>
>
> 15/05/07 09:18:31 INFO Main: Initializing context
>
> 15/05/07 09:18:31 INFO SparkContext: Running Spark version 1.3.1
>
> 15/05/07 09:18:31 INFO SecurityManager: Changing view acls to: myuser,myapp
>
> 15/05/07 09:18:31 INFO SecurityManager: Changing modify acls to:
> myuser,myapp
>
> 15/05/07 09:18:31 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(myuser,
> myapp); users with modify permissions: Set(myuser, myapp)
>
> 15/05/07 09:18:31 INFO Slf4jLogger: Slf4jLogger started
>
> 15/05/07 09:18:31 INFO Remoting: Starting remoting
>
> 15/05/07 09:18:31 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@mymachine:54449]
>
> 15/05/07 09:18:31 INFO Utils: Successfully started service 'sparkDriver'
> on port 54449.
>
> 15/05/07 09:18:31 INFO SparkEnv: Registering MapOutputTracker
>
> 15/05/07 09:18:32 INFO SparkEnv: Registering BlockManagerMaster
>
> 15/05/07 09:18:32 INFO DiskBlockManager: Created local directory at
> C:\Users\myuser\AppData\Local\Temp\spark-2d3db9d6-ea78-438e-956f-be9c1dcf3a9d\blockmgr-e9ade223-a4b8-4d9f-b038-efd66adf9772
>
> 15/05/07 09:18:32 INFO MemoryStore: MemoryStore started with capacity
> 1956.7 MB
>
> 15/05/07 09:18:32 INFO HttpFileServer: HTTP File server directory is
> C:\Users\myuser\AppData\Local\Temp\spark-ff40d73b-e8ab-433e-88c4-35da27fb6278\httpd-def9220f-ac3a-4dd2-9ac1-2c593b94b2d9
>
> 15/05/07 09:18:32 INFO HttpServer: Starting HTTP Server
>
> 15/05/07 09:18:32 INFO Server: jetty-8.y.z-SNAPSHOT
>
> 15/05/07 09:18:32 INFO AbstractConnector: Started
> SocketConnector@0.0.0.0:54450
>
> 15/05/07 09:18:32 INFO Utils: Successfully started service 'HTTP file
> server' on port 54450.
>
> 15/05/07 09:18:32 INFO SparkEnv: Registering OutputCommitCoordinator
>
> 15/05/07 09:18:32 INFO Server: jetty-8.y.z-SNAPSHOT
>
> 15/05/07 09:18:32 INFO AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
>
> 15/05/07 09:18:32 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
>
> 15/05/07 09:18:32 INFO SparkUI: Started SparkUI at http://mymachine:4040
>
> 15/05/07 09:18:32 INFO SparkContext: Added JAR
> file:/D:/Projects/MyApp/MyApp.jar at
> http://10.111.111.199:54450/jars/MyApp.jar with timestamp 1430957912240
>
> 15/05/07 09:18:32 INFO RMProxy: Connecting to ResourceManager at cluster01/
> 10.111.111.11:8050
>
> 15/05/07 09:18:32 INFO Client: Requesting a new application from cluster
> with 3 NodeManagers
>
> 15/05/07 09:18:32 INFO Client: Verifying our application has not requested
> more than the maximum memory capability of the cluster (23040 MB per
> container)
>
> 15/05/07 09:18:32 INFO Client: Will allocate AM container, with 896 MB
> memory including 384 MB overhead
>
> 15/05/07 09:18:32 INFO Client: Setting up container launch context for our
> AM
>
> 15/05/07 09:18:32 INFO Client: Preparing resources for our AM container
>
> 15/05/07 09:18:32 INFO Client: Source and destination file systems are the
> same. Not copying
> hdfs://cluster01/apps/spark/spark-assembly-1.3.1-hadoop2.6.0.jar
>
> 15/05/07 09:18:32 INFO Client: Setting up the launch environment for our
> AM container
>
> 15/05/07 09:18:33 INFO SecurityManager: Changing view acls to: myuser,myapp
>
> 15/05/07 09:18:33 INFO SecurityManager: Changing modify acls to:
> myuser,myapp
>
> 15/05/07 09:18:33 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; user

Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread yana
I believe this is a regression. Does not work for me either. There is a Jira on 
parquet wildcards which is resolved, I'll see about getting it reopened


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message From: Vaxuki 
 Date:05/07/2015  7:38 AM  (GMT-05:00) 
To: Olivier Girardot  Cc: 
user@spark.apache.org Subject: Re: Spark 1.3.1 and Parquet 
Partitions 
Olivier 
Nope. Wildcard extensions don't work I am debugging the code to figure out 
what's wrong I know I am using 1.3.1 for sure

Pardon typos...

On May 7, 2015, at 7:06 AM, Olivier Girardot  wrote:

"hdfs://some ip:8029/dataset/*/*.parquet" doesn't work for you ?

Le jeu. 7 mai 2015 à 03:32, vasuki  a écrit :
Spark 1.3.1 -
i have a parquet file on hdfs partitioned by some string looking like this
/dataset/city=London/data.parquet
/dataset/city=NewYork/data.parquet
/dataset/city=Paris/data.paruqet
….

I am trying to get to load it using sqlContext using sqlcontext.parquetFile(
"hdfs://some ip:8029/dataset/< what do i put here >

No leads so far. is there i can load the partitions ? I am running on
cluster and not local..
-V



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort (order by) of the big dataset

2015-05-07 Thread Night Wolf
What was the answer, was it only setting spark.sql.shuffle.partitions?

On Thu, Apr 30, 2015 at 12:14 PM, Ulanov, Alexander  wrote:

>  After day of debugging (actually, more), I can answer my question:
>
> The problem is that the default value 200 of
> “spark.sql.shuffle.partitions” is too small for sorting 2B rows. It was
> hard to realize because Spark executors just crash with various exceptions
> one by one. The other takeaway is that Dataframe “order by” and RDD.sortBy
> are implemented in different ways. BTW., why?
>
>
>
> Small synthetic test (copied from my blog):
>
> Create 2B rows of MyRecord within 2000 partitions, so each partition will
> have 1M of rows.
>
> import sqlContext.implicits._
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> case class MyRecord(time: Double, id: String)
>
> val rdd = sc.parallelize(1 to 200, 200).flatMap(x =>
> Seq.fill(1000)(MyRecord(util.Random.nextDouble, "xxx")))
>
>
>
> Lets sort this RDD by time:
>
> val sorted = rdd.sortBy(x => x.time)
>
> result.count
>
>
>
> It finished in about 8 minutes on my cluster of 8 nodes. Everything's
> fine. You can also check tasks that were completed in Spark web UI. The
> number of reducers was equal to the number of partitions, i.e. 2000
>
>
>
> Lets convert the original RDD to Dataframe and sort again:
>
> val df = sqlContext.createDataFrame(rdd)
>
> df.registerTempTable("data")
>
> val result = sqlContext.sql("select * from data order by time")
>
> result.count
>
>
>
> It will run for a while and then crash. If you check tasks in the Spark
> Web UI, you will see that some of them were cancelled due to lost executors
> (ExecutorLost) due to some strange Exceptions. It is really hard to trace
> back which executor was first to be lost. The other follow it as in house
> of cards. What's the problem? The number of reducers. For the first task it
> is equal to the number of partitions, i.e. 2000, but for the second it
> switched to 200.
>
>
>
> *From:* Ulanov, Alexander
> *Sent:* Wednesday, April 29, 2015 1:08 PM
> *To:* user@spark.apache.org
> *Subject:* Sort (order by) of the big dataset
>
>
>
> Hi,
>
>
>
> I have a 2 billion records dataset witch schema  Double, value: Double>. It is stored in Parquet format in HDFS, size 23GB.
> Specs: Spark 1.3, Hadoop 1.2.1, 8 nodes with Xeon 16GB RAM, 1TB disk space,
> each node has 3 workers with 3GB memory.
>
>
>
> I keep failing to sort the mentioned dataset in Spark. I do the following:
>
> val pf = sqlContext.parquetFile(“hdfs://my.net/data.parquet”)
>
> pf.registerTempTable(“data”)
>
> val sorted = sqlContext.sql(“select * from data order by time”)
>
> sorted.saveAsParquetFile(“hdfs://my.net/data-sorted.parquet”)
>
>
>
> Spark starts to execute tasks and then errors like “Exector Lost” pop up
> in the web UI (task mapPartitions at Exchange.scala and runJob at
> newParquet.scala), giving different types of Exceptions in explanation. My
> thinking is that the main problem is with “GC overhead limit” exception
> however I observe exceptions related to connection time out and shuffling
> write (java.lang.IllegalStateException: Shutdown in progress;
> org.apache.spark.shuffle.FetchFailedException:
> java.io.FileNotFoundException ).
>
>
>
> What I tried:
>
> 1)Tried to “order by eventId” and by both “order by eventId, time” with
> the same result.
>
> 2)Looked at the shuffle parameters but the default do make sense.
>
> 3)Tried to repartition the data I am loading from Parquet:  val pf3000 =
> pf.repartition(3000) in order to get smaller chunks of data passed to
> executors (originally there are 300 partitions). It did not help either.
> Surprisingly this dataset takes 50GB on hdfs versus 23GB that took the
> original.
>
> 4)Tried to have 8 workers instead of 24 and gave them 10G of memory. It
> did not help.
>
>
>
> Could you suggest what might be the problem and what is the workaround?
> Just in case, I cannot have more RAM or more machines J
>
>
>
> Best  regards, Alexander
>


Re: Parquet number of partitions

2015-05-07 Thread Eric Eijkelenboom
Funny enough, I observe different behaviour on EC2 vs EMR (Spark on EMR 
installed with 
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark 
). Both 
with Spark 1.3.1/Hadoop 2.

Reading a folder with 12 Parquet gives me the following:

On EC2: 
scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala> logs.rdd.partitions.length
15/05/07 11:45:50 INFO ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(125716) called with 
curMem=0, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 122.8 KB, free 265.3 MB)
15/05/07 11:45:51 INFO MemoryStore: ensureFreeSpace(19128) called with 
curMem=125716, maxMem=278302556
15/05/07 11:45:51 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 18.7 KB, free 265.3 MB)
15/05/07 11:45:51 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
ip-10-31-82-233.ec2.internal:39894 (size: 18.7 KB, free: 265.4 MB)
15/05/07 11:45:51 INFO BlockManagerMaster: Updated info of block 
broadcast_0_piece0
15/05/07 11:45:51 INFO SparkContext: Created broadcast 0 from NewHadoopRDD at 
newParquet.scala:478
15/05/07 11:45:51 INFO ParquetRelation2$$anon$1$$anon$2: Using Task Side 
Metadata Split Strategy
res0: Int = 12

On EMR:
scala> val logs = sqlContext.parquetFile(“s3n://mylogs/”)
...
scala> logs.rdd.partitions.length
15/05/07 11:46:53 INFO parquet.ParquetRelation2: Reading 100.0% of partitions
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(266059) called with 
curMem=287247, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1 stored as values 
in memory (estimated size 259.8 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.MemoryStore: ensureFreeSpace(21188) called with 
curMem=553306, maxMem=6667936727
15/05/07 11:46:53 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as 
bytes in memory (estimated size 20.7 KB, free 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in 
memory on ip-10-203-174-61.ec2.internal:52570 (size: 20.7 KB, free: 6.2 GB)
15/05/07 11:46:53 INFO storage.BlockManagerMaster: Updated info of block 
broadcast_1_piece0
15/05/07 11:46:53 INFO spark.SparkContext: Created broadcast 1 from 
NewHadoopRDD at newParquet.scala:478
15/05/07 11:46:54 INFO parquet.ParquetRelation2$$anon$1$$anon$2: Using Task 
Side Metadata Split Strategy
res3: Int = 138

138 (!) partitions on EMR and 12 partitions on EC2 (same as number of files). 
I’m reading from the exact same folder on S3.

This leads me to believe that there might be some configuration settings which 
control how partitioning happens. Could that be the case?

Insights would be greatly appreciated. 

Best, Eric



> On 07 May 2015, at 09:31, Archit Thakur  wrote:
> 
> Hi.
> No. of partitions are determined by the RDD it uses in the plan it creates. 
> It uses NewHadoopRDD which gives partitions by getSplits of input format it 
> is using. It uses FilteringParquetRowInputFormat subclass of 
> ParquetInputFormat. To change the no of partitions write a new input format 
> and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can use 
> repartition api without change of code.
> 
> Thanks & Regards.
> 
> On Tue, May 5, 2015 at 7:56 PM, Masf  > wrote:
> Hi Eric.
> 
> Q1:
> When I read parquet files, I've tested that Spark generates so many 
> partitions as parquet files exist in the path.
> 
> Q2:
> To reduce the number of partitions you can use rdd.repartition(x), x=> number 
> of partitions. Depend on your case, repartition could be a heavy task
> 
> 
> Regards.
> Miguel.
> 
> On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom 
> mailto:eric.eijkelenb...@gmail.com>> wrote:
> Hello guys
> 
> Q1: How does Spark determine the number of partitions when reading a Parquet 
> file?
> 
> val df = sqlContext.parquetFile(path)
> 
> Is it some way related to the number of Parquet row groups in my input?
> 
> Q2: How can I reduce this number of partitions? Doing this:
> 
> df.rdd.coalesce(200).count
> 
> from the spark-shell causes job execution to hang…
> 
> Any ideas? Thank you in advance.
> 
> Eric
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> 
> 
> Saludos.
> Miguel Ángel
> 



Re: Partition Case Class RDD without ParRDDFunctions

2015-05-07 Thread Night Wolf
MyClass is a basic scala case class (using Spark 1.3.1);

case class Result(crn: Long, pid: Int, promoWk: Int, windowKey: Int,
ipi: Double) {
  override def hashCode(): Int = crn.hashCode()
}


On Wed, May 6, 2015 at 8:09 PM, ayan guha  wrote:

> How does your MyClqss looks like? I was experimenting with Row class in
> python and apparently partitionby automatically takes first column as key.
> However, I am not sure how you can access a part of an object without
> deserializing it (either explicitly or Spark doing it for you)
>
> On Wed, May 6, 2015 at 7:14 PM, Night Wolf  wrote:
>
>> Hi,
>>
>> If I have an RDD[MyClass] and I want to partition it by the hash code of
>> MyClass for performance reasons, is there any way to do this without
>> converting it into a PairRDD RDD[(K,V)] and calling partitionBy???
>>
>> Mapping it to a tuple2 seems like a waste of space/computation.
>>
>> It looks like the PairRDDFunctions..partitionBy() uses a
>> ShuffleRDD[K,V,C] requires K,V,C? Could I create a new
>> ShuffleRDD[MyClass,MyClass,MyClass](caseClassRdd, new HashParitioner)?
>>
>> Cheers,
>> N
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread Vaxuki
Olivier 
Nope. Wildcard extensions don't work I am debugging the code to figure out 
what's wrong I know I am using 1.3.1 for sure

Pardon typos...

> On May 7, 2015, at 7:06 AM, Olivier Girardot  wrote:
> 
> "hdfs://some ip:8029/dataset/*/*.parquet" doesn't work for you ?
> 
>> Le jeu. 7 mai 2015 à 03:32, vasuki  a écrit :
>> Spark 1.3.1 -
>> i have a parquet file on hdfs partitioned by some string looking like this
>> /dataset/city=London/data.parquet
>> /dataset/city=NewYork/data.parquet
>> /dataset/city=Paris/data.paruqet
>> ….
>> 
>> I am trying to get to load it using sqlContext using sqlcontext.parquetFile(
>> "hdfs://some ip:8029/dataset/< what do i put here >
>> 
>> No leads so far. is there i can load the partitions ? I am running on
>> cluster and not local..
>> -V
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org


Not maximum CPU usage

2015-05-07 Thread Krever
I have a small application configured to use 6 cpu cores and I run it on
standalone cluster. Such configuration means that only 6 task can be active
in one moment and if all of them are waitng(IO for example) then not whole
CPU is used. 

My questions:
1. Is it true that number of active tasks per executor is equal to number of
available cores to that executor? If it is so, it means that by increasing
parallelism we can only get smaller tasks, but not increase number of used
threads. By active tasks I mean the number visible at webUI/executors. Or
maybe waiting threads are not included in "active tasks"?

2.Is it a good practice to overcommit CPU cores if we now that waiting is
significant part of our tasks?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Not-maximum-CPU-usage-tp22794.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.3.1 and Parquet Partitions

2015-05-07 Thread Olivier Girardot
"hdfs://some ip:8029/dataset/*/*.parquet" doesn't work for you ?

Le jeu. 7 mai 2015 à 03:32, vasuki  a écrit :

> Spark 1.3.1 -
> i have a parquet file on hdfs partitioned by some string looking like this
> /dataset/city=London/data.parquet
> /dataset/city=NewYork/data.parquet
> /dataset/city=Paris/data.paruqet
> ….
>
> I am trying to get to load it using sqlContext using
> sqlcontext.parquetFile(
> "hdfs://some ip:8029/dataset/< what do i put here >
>
> No leads so far. is there i can load the partitions ? I am running on
> cluster and not local..
> -V
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-and-Parquet-Partitions-tp22792.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


update resource when running spark

2015-05-07 Thread Hoai-Thu Vuong
Hi all,

I use a function to create or return context for spark application, in this
function I load some resources from text file to a list. My question is how
to update a list?


Re: How can I force operations to complete and spool to disk

2015-05-07 Thread Steve Lewis
I give the executor 14gb and would like to cut it.
I expect the critical operations to run hundreds of millions of times which
is why we run on a cluster. I will try DISK_ONLY_SER
Thanks

Steven Lewis sent from my phone
On May 7, 2015 10:59 AM, "ayan guha"  wrote:

> 2*2 cents
>
> 1. You can try repartition and give a large number to achieve smaller
> partitions.
> 2. OOM errors can be avoided by increasing executor memory or using off
> heap storage
> 3. How are you persisting? You can try using persist using DISK_ONLY_SER
> storage level
> 4. You may take a look in the algorithm once more. "Tasks typically
> preform both operations several hundred thousand times." why it can not be
> done distributed way?
>
> On Thu, May 7, 2015 at 3:16 PM, Steve Lewis  wrote:
>
>> I am performing a job where I perform a number of steps in succession.
>> One step is a map on a JavaRDD which generates objects taking up
>> significant memory.
>> The this is followed by a join and an aggregateByKey.
>> The problem is that the system is running getting OutOfMemoryErrors -
>> Most tasks work but a few fail. Tasks typically preform both operations
>> several hundred thousand times.
>> I am convinced things would work if the map ran to completion and
>> shuffled results to disk before starting the aggregateByKey.
>> I tried calling persist and then count on the results of the map to force
>> execution but this does not seem to help. Smaller partitions might also
>> help if these could be forced.
>> Any ideas?
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


User Defined Type (UDT)

2015-05-07 Thread Wojtek Jurczyk
Hi all!

I'm using Spark 1.3.0 and I'm struggling with a definition of a new type
for a project I'm working on. I've created a case class Person(name:
String) and now I'm trying to make Spark to be able serialize and
deserialize the defined type. I made a couple of attempts but none of them
did not work in 100% (there were issues either in serialization or
deserialization).

This is my class and the corresponding UDT.

@SQLUserDefinedType(udt = classOf[PersonUDT])
case class Person(name: String)

class PersonUDT extends UserDefinedType[Person] {
  override def sqlType: DataType = StructType(Seq(StructField("name",
StringType)))

  override def serialize(obj: Any): Seq[Any] = {
obj match {
  case c: Person =>
Seq(c.name)
}
  }

  override def userClass: Class[Person] = classOf[Person]

  override def deserialize(datum: Any): Person = {
datum match {
  case values: Seq[_] =>
assert(values.length == 1)
Person(values.head.asInstanceOf[String])
  case values: util.ArrayList[_] =>
Person(values.get(0).asInstanceOf[String])
}
  }

  // In some other attempt I was creating RDD of Seq with manually
serialized data and
  // I had to override equals because two DFs with the same type weren't
actually equal
  // StructField(person,...types.PersonUDT@a096ac3)
  // StructField(person,...types.PersonUDT@613fd937)
  def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]

  override def equals(other: Any): Boolean = other match {
case that: PersonUDT => true
case _ => false
  }

  override def hashCode(): Int = 1
}

This is how I create RDD of Person and then try to create a DataFrame
val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
val sparkDataFrame = sqlContext.createDataFrame(rdd)

The second line throws an exception:
java.lang.ClassCastException: types.PersonUDT cannot be cast to
org.apache.spark.sql.types.StructType
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)

I looked into the code in SQLContext.scala and it seems that the code
requires UDT to be extending StructType but in fact it extends
UserDefinedType which extends directly DataType.
I'm not sure whether it is a bug or I just don't know how to use UDTs.

Do you have any suggestions how to solve this? I based my UDT on
ExamplePointUDT but it seems to be incorrect. Is there a working example
for UDT?


Thank you for the reply in advance!
Wojtek


Re: How can I force operations to complete and spool to disk

2015-05-07 Thread ayan guha
2*2 cents

1. You can try repartition and give a large number to achieve smaller
partitions.
2. OOM errors can be avoided by increasing executor memory or using off
heap storage
3. How are you persisting? You can try using persist using DISK_ONLY_SER
storage level
4. You may take a look in the algorithm once more. "Tasks typically preform
both operations several hundred thousand times." why it can not be done
distributed way?

On Thu, May 7, 2015 at 3:16 PM, Steve Lewis  wrote:

> I am performing a job where I perform a number of steps in succession.
> One step is a map on a JavaRDD which generates objects taking up
> significant memory.
> The this is followed by a join and an aggregateByKey.
> The problem is that the system is running getting OutOfMemoryErrors -
> Most tasks work but a few fail. Tasks typically preform both operations
> several hundred thousand times.
> I am convinced things would work if the map ran to completion and shuffled
> results to disk before starting the aggregateByKey.
> I tried calling persist and then count on the results of the map to force
> execution but this does not seem to help. Smaller partitions might also
> help if these could be forced.
> Any ideas?
>



-- 
Best Regards,
Ayan Guha


Re: Error in SparkSQL/Scala IDE

2015-05-07 Thread Iulian Dragoș
Got it!

I'll open a Jira ticket and PR when I have a working solution.

On Wed, May 6, 2015 at 11:53 PM, Michael Armbrust 
wrote:

> Hi Iulian,
>
> The relevant code is in ScalaReflection
> ,
> and it would be awesome if you could suggest how to fix this more
> generally. Specifically, this code is also broken when running from SBT:
>
>
> $ build/sbt hive/console
> scala> import implicits._
> import implicits._
>
> scala> Seq((1,2)).toDF("a", "b")
> scala.reflect.internal.MissingRequirementError: class
> org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
> java.net.URLClassLoader@752d2e33 of type class java.net.URLClassLoader
> with classpath
> [file:/root/.sbt/boot/scala-2.10.4/lib/jline.jar,file:/root/.sbt/boot/scala-2.10.4/lib/scala-library.jar,file:/root/.sbt/boot/scala-2.10.4/lib/scala-compiler.jar,file:/root/.sbt/boot/scala-2.10.4/lib/jansi.jar,file:/root/.sbt/boot/scala-2.10.4/lib/scala-reflect.jar]
> and parent being xsbt.boot.BootFilteredLoader@625e4591 of type class
> xsbt.boot.BootFilteredLoader with classpath [] and parent being
> sun.misc.Launcher$AppClassLoader@1a8c064 of type class
> sun.misc.Launcher$AppClassLoader with classpath
> [file:/root/spark/build/sbt-launch-0.13.7.jar] and parent being
> sun.misc.Launcher$ExtClassLoader@6deee615 of type class
> sun.misc.Launcher$ExtClassLoader with classpath
> [file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunjce_provider.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/
> nsns.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/java-atk-wrapper.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunpkcs11.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/zipfs.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/localedata.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/icedtea-sound.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/libatk-wrapper.so]
> and parent being primordial classloader with boot classpath
> [/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rt.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rhino.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jfr.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/classes]
> not found.
>
> Michael
>
> On Wed, May 6, 2015 at 2:12 AM, Iulian Dragoș 
> wrote:
>
>> Hi, I just saw this question. I posted my solution to this stack
>> overflow question.
>> 
>>
>> Scala reflection can take a classloader when creating a mirror (
>> universe.runtimeMirror(loader)). I can have a look, but I didn’t find
>> much about mirrors in spark-sql.
>>
>> iulian
>> ​
>>
>
>


-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Error in SparkSQL/Scala IDE

2015-05-07 Thread Iulian Dragoș
On Thu, May 7, 2015 at 10:18 AM, Iulian Dragoș 
wrote:

> Got it!
>
> I'll open a Jira ticket and PR when I have a working solution.
>

Scratch that, I found SPARK-5281
..


>
> On Wed, May 6, 2015 at 11:53 PM, Michael Armbrust 
> wrote:
>
>> Hi Iulian,
>>
>> The relevant code is in ScalaReflection
>> ,
>> and it would be awesome if you could suggest how to fix this more
>> generally. Specifically, this code is also broken when running from SBT:
>>
>>
>> $ build/sbt hive/console
>> scala> import implicits._
>> import implicits._
>>
>> scala> Seq((1,2)).toDF("a", "b")
>> scala.reflect.internal.MissingRequirementError: class
>> org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with
>> java.net.URLClassLoader@752d2e33 of type class java.net.URLClassLoader
>> with classpath
>> [file:/root/.sbt/boot/scala-2.10.4/lib/jline.jar,file:/root/.sbt/boot/scala-2.10.4/lib/scala-library.jar,file:/root/.sbt/boot/scala-2.10.4/lib/scala-compiler.jar,file:/root/.sbt/boot/scala-2.10.4/lib/jansi.jar,file:/root/.sbt/boot/scala-2.10.4/lib/scala-reflect.jar]
>> and parent being xsbt.boot.BootFilteredLoader@625e4591 of type class
>> xsbt.boot.BootFilteredLoader with classpath [] and parent being
>> sun.misc.Launcher$AppClassLoader@1a8c064 of type class
>> sun.misc.Launcher$AppClassLoader with classpath
>> [file:/root/spark/build/sbt-launch-0.13.7.jar] and parent being
>> sun.misc.Launcher$ExtClassLoader@6deee615 of type class
>> sun.misc.Launcher$ExtClassLoader with classpath
>> [file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunjce_provider.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/
>> nsns.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/java-atk-wrapper.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/sunpkcs11.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/zipfs.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/localedata.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/icedtea-sound.jar,file:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/ext/libatk-wrapper.so]
>> and parent being primordial classloader with boot classpath
>> [/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rt.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/sunrsasign.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/rhino.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/lib/jfr.jar:/usr/lib/jvm/java-7-openjdk-amd64/jre/classes]
>> not found.
>>
>> Michael
>>
>> On Wed, May 6, 2015 at 2:12 AM, Iulian Dragoș > > wrote:
>>
>>> Hi, I just saw this question. I posted my solution to this stack
>>> overflow question.
>>> 
>>>
>>> Scala reflection can take a classloader when creating a mirror (
>>> universe.runtimeMirror(loader)). I can have a look, but I didn’t find
>>> much about mirrors in spark-sql.
>>>
>>> iulian
>>> ​
>>>
>>
>>
>
>
> --
>
> --
> Iulian Dragos
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
>


-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Parquet number of partitions

2015-05-07 Thread Archit Thakur
Hi.
No. of partitions are determined by the RDD it uses in the plan it creates.
It uses NewHadoopRDD which gives partitions by getSplits of input format it
is using. It uses FilteringParquetRowInputFormat subclass of
ParquetInputFormat. To change the no of partitions write a new input format
and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can
use repartition api without change of code.

Thanks & Regards.

On Tue, May 5, 2015 at 7:56 PM, Masf  wrote:

> Hi Eric.
>
> Q1:
> When I read parquet files, I've tested that Spark generates so many
> partitions as parquet files exist in the path.
>
> Q2:
> To reduce the number of partitions you can use rdd.repartition(x), x=>
> number of partitions. Depend on your case, repartition could be a heavy task
>
>
> Regards.
> Miguel.
>
> On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom <
> eric.eijkelenb...@gmail.com> wrote:
>
>> Hello guys
>>
>> Q1: How does Spark determine the number of partitions when reading a
>> Parquet file?
>>
>> val df = sqlContext.parquetFile(path)
>>
>> Is it some way related to the number of Parquet row groups in my input?
>>
>> Q2: How can I reduce this number of partitions? Doing this:
>>
>> df.rdd.coalesce(200).count
>>
>> from the spark-shell causes job execution to hang…
>>
>> Any ideas? Thank you in advance.
>>
>> Eric
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>