Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-03 Thread M. Dale
Try spark.yarn.user.classpath.first (see 
https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN). 
Also thread at 
http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html.


HTH,
Markus

On 02/03/2015 11:20 PM, Corey Nolet wrote:
I'm having a really bad dependency conflict right now with Guava 
versions between my Spark application in Yarn and (I believe) Hadoop's 
version.


The problem is, my driver has the version of Guava which my 
application is expecting (15.0) while it appears the Spark executors 
that are working on my RDDs have a much older version (assuming it's 
the old version on the Hadoop classpath).


Is there a property like mapreduce.job.user.classpath.first' that I 
can set to make sure my own classpath is extablished first on the 
executors?




Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-03 Thread bo yang
Corey,

Which version of Spark do you use? I am using Spark 1.2.0, and  guava 15.0.
It seems fine.

Best,
Bo


On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid wrote:

  Try spark.yarn.user.classpath.first (see
 https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN).
 Also thread at
 http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html
 .

 HTH,
 Markus

 On 02/03/2015 11:20 PM, Corey Nolet wrote:

 I'm having a really bad dependency conflict right now with Guava versions
 between my Spark application in Yarn and (I believe) Hadoop's version.

  The problem is, my driver has the version of Guava which my application
 is expecting (15.0) while it appears the Spark executors that are working
 on my RDDs have a much older version (assuming it's the old version on the
 Hadoop classpath).

  Is there a property like mapreduce.job.user.classpath.first' that I can
 set to make sure my own classpath is extablished first on the executors?





“mapreduce.job.user.classpath.first” for Spark

2015-02-03 Thread Corey Nolet
I'm having a really bad dependency conflict right now with Guava versions
between my Spark application in Yarn and (I believe) Hadoop's version.

The problem is, my driver has the version of Guava which my application is
expecting (15.0) while it appears the Spark executors that are working on
my RDDs have a much older version (assuming it's the old version on the
Hadoop classpath).

Is there a property like mapreduce.job.user.classpath.first' that I can
set to make sure my own classpath is extablished first on the executors?


HiveContext in SparkSQL - concurrency issues

2015-02-03 Thread matha.harika
Hi,

I've been trying to use HiveContext(instead of SQLContext) in my SparkSQL
application and when I run the application simultaneously, it only works on
the first call and every other call throws the following error- 

ERROR Datastore.Schema: Failed initialising database.
Failed to start database 'metastore_db' with class loader
sun.misc.Launcher$AppClassLoader@3s405f32, see the next exception for
details.
org.datanucleus.exceptions.NucleusDataStoreException: Failed to start
database 'metastore_db' with class loader
sun.misc.Launcher$AppClassLoader@3s405f32, see the next exception for
details.
at
org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:516)
at
org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:298)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
at
org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
at
org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187)
at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
at
org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339)
at
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248)
at
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at
org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58)
at
org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:497)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:475)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:523)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:397)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:356)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59)
at
org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4944)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235)
at

Multiple running SparkContexts detected in the same JVM!

2015-02-03 Thread gavin zhang
I have a cluster which running CDH5.1.0 with Spark component.
Because the default version of Spark from CDH5.1.0 is 1.0.0 while I want to
use some features of Spark 1.2.0, I compiled another Spark with Maven.
But when I run into Spark-shell and created a new SparkContext, I met the
below error:

15/02/04 14:08:19 WARN SparkContext: Multiple running SparkContexts detected
in the same JVM!
org.apache.spark.SparkException: Only one SparkContext may be running in
this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at
...

And I tried to delete the default Spark and
*set(spark.driver.allowMultipleContexts, true) * option, But It didn't
work.

How could I fix it? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-running-SparkContexts-detected-in-the-same-JVM-tp21492.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StackOverflowError on RDD.union

2015-02-03 Thread Mark Hamstra
Use SparkContext#union[T](rdds: Seq[RDD[T]])

On Tue, Feb 3, 2015 at 7:43 PM, Thomas Kwan thomas.k...@manage.com wrote:

 I am trying to combine multiple RDDs into 1 RDD, and I am using the union
 function. I wonder if anyone has seen StackOverflowError as follows:

 Exception in thread main java.lang.StackOverflowError
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
 at
 org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
 at
 org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
 at
 org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
 at
 org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
 at scala.Option.getOrElse(Option.scala:120)




Re: connector for CouchDB

2015-02-03 Thread hnahak
Spark Doesn't support it, but this connector is open source, you can get it
from github. 

The difference between these two DB is depending on what type of solution
you are looking for. Please refer this link : 
http://blog.nahurst.com/visual-guide-to-nosql-systems 

FYI, from the list of NOSQL in above link, there are connectors available
for MongoDb and cassendra. For storing JSON I think both system support
this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/connector-for-CouchDB-tp18630p21489.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



StackOverflowError on RDD.union

2015-02-03 Thread Thomas Kwan
I am trying to combine multiple RDDs into 1 RDD, and I am using the union
function. I wonder if anyone has seen StackOverflowError as follows:

Exception in thread main java.lang.StackOverflowError
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
   


Re: Spark (SQL) as OLAP engine

2015-02-03 Thread McNerlin, Andrew (Agoda)
Hi Sean,

I'm interested in trying something similar.  How was your performance when you 
had many concurrent queries running against spark?  I know this will work well 
where you have a low volume of queries against a large dataset, but am 
concerned about having a high volume of queries against the same large dataset. 
(I know I've not defined large, but hopefully you get the gist:))

I'm using Cassandra to handle workloads where we have large amounts of low 
complexity queries, but want to move to an architecture which supports a 
similar(ish) large volume of higher complexity queries.  I'd like to use spark 
as the query serving layer, but have concerns about how many concurrent queries 
it could handle.

I'd be interested in anyones thoughts or experience with this.

Thanks,
Andrew

From: Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com
Date: Wednesday, February 4, 2015 at 1:01
To: Adamantios Corais 
adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark (SQL) as OLAP engine

We have gone down a similar path at Webtrends, Spark has worked amazingly well 
for us in this use case.  Our solution goes from REST, directly into spark, and 
back out to the UI instantly.

Here is the resulting product in case you are curious (and please pardon the 
self promotion): 
https://www.webtrends.com/support-training/training/explore-onboarding/


 How can I automatically cache the data once a day...

If you are not memory-bounded you could easily cache the daily results for some 
span of time and re-union them together each time you add new data.  You would 
service queries off the unioned RDD.


 ... and make them available on a web service

From the unioned RDD you could always step into spark SQL at that point.  Or 
you could use a simple scatter/gather pattern for this.  As with all things 
Spark, this is super easy to do: just use aggregate()()!


Cheers,

Sean


On Feb 3, 2015, at 9:59 AM, Adamantios Corais 
adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com wrote:

Hi,

After some research I have decided that Spark (SQL) would be ideal for building 
an OLAP engine. My goal is to push aggregated data (to Cassandra or other 
low-latency data storage) and then be able to project the results on a web page 
(web service). New data will be added (aggregated) once a day, only. On the 
other hand, the web service must be able to run some fixed(?) queries (either 
on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I 
can already achieve similar speeds while in REPL mode by caching the data. 
Therefore, I believe that my problem must be re-phrased as follows: How can I 
automatically cache the data once a day and make them available on a web 
service that is capable of running any Spark or Spark (SQL)  statement in order 
to plot the results with D3.js?

Note that I have already some experience in Spark (+Spark SQL) as well as D3.js 
but not at all with OLAP engines (at least in their traditional form).

Any ideas or suggestions?

// Adamantios





This message is confidential and is for the sole use of the intended 
recipient(s). It may also be privileged or otherwise protected by copyright or 
other legal rules. If you have received it by mistake please let us know by 
reply email and delete it from your system. It is prohibited to copy this 
message or disclose its content to anyone. Any confidentiality or privilege is 
not waived or lost by any mistaken delivery or unauthorized disclosure of the 
message. All messages sent to and from Agoda may be monitored to ensure 
compliance with company policies, to protect the company's interests and to 
remove potential malware. Electronic messages may be intercepted, amended, lost 
or deleted, or contain viruses.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fail to launch spark-shell on windows 2008 R2

2015-02-03 Thread Denny Lee
Hi Ningjun,

I have been working with Spark 1.2 on Windows 7 and Windows 2008 R2 (purely
for development purposes).  I had most recently installed them utilizing
Java 1.8, Scala 2.10.4, and Spark 1.2 Precompiled for Hadoop 2.4+.  A handy
thread concerning the null\bin\winutils issue is addressed in an earlier
thread at:
http://apache-spark-user-list.1001560.n3.nabble.com/Run-spark-unit-test-on-Windows-7-td8656.html

Hope this helps a little bit!
Denny





On Tue Feb 03 2015 at 8:24:24 AM Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Hi Gen



 Thanks for your feedback. We do have a business reason to run spark on
 windows. We have an existing application that is built on C# .NET running
 on windows. We are considering adding spark to the application for parallel
 processing of large data. We want spark to run on windows so it integrate
 with our existing app easily.



 Has anybody use spark on windows for production system? Is spark reliable
 on windows?



 Ningjun



 *From:* gen tang [mailto:gen.tan...@gmail.com]
 *Sent:* Thursday, January 29, 2015 12:53 PM


 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Fail to launch spark-shell on windows 2008 R2



 Hi,



 Using spark under windows is a really bad idea, because even you solve the
 problems about hadoop, you probably will meet the problem of
 java.net.SocketException. connection reset by peer. It is caused by the
 fact we ask socket port too frequently under windows. In my knowledge, it
 is really difficult to solve. And you will find something really funny: the
 same code sometimes works and sometimes not, even in the shell mode.



 And I am sorry but I don't see the interest to run spark under windows and
 moreover using local file system in a business environment. Do you have a
 cluster in windows?



 FYI, I have used spark prebuilt on hadoop 1 under windows 7 and there is
 no problem to launch, but have problem of java.net.SocketException. If you
 are using spark prebuilt on hadoop 2, you should consider follow the
 solution provided by https://issues.apache.org/jira/browse/SPARK-2356



 Cheers

 Gen







 On Thu, Jan 29, 2015 at 5:54 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

 Install virtual box which run Linux? That does not help us. We have
 business reason to run it on Windows operating system, e.g. Windows 2008 R2.



 If anybody have done that, please give some advise on what version of
 spark, which version of Hadoop do you built spark against, etc…. Note that
 we only use local file system and do not have any hdfs file system at all.
 I don’t understand why spark generate so many error on Hadoop while we
 don’t even need hdfs.



 Ningjun





 *From:* gen tang [mailto:gen.tan...@gmail.com]
 *Sent:* Thursday, January 29, 2015 10:45 AM
 *To:* Wang, Ningjun (LNG-NPV)
 *Cc:* user@spark.apache.org
 *Subject:* Re: Fail to launch spark-shell on windows 2008 R2



 Hi,



 I tried to use spark under windows once. However the only solution that I
 found is to install virtualbox



 Hope this can help you.

 Best

 Gen





 On Thu, Jan 29, 2015 at 4:18 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

 I deployed spark-1.1.0 on Windows 7 and was albe to launch the
 spark-shell. I then deploy it to windows 2008 R2 and launch the
 spark-shell, I got the error



 java.lang.RuntimeException: Error while running command to get file
 permissions : java.io.IOExceptio

 n: Cannot run program ls: CreateProcess error=2, The system cannot find
 the file specified

 at java.lang.ProcessBuilder.start(Unknown Source)

 at org.apache.hadoop.util.Shell.runCommand(Shell.java:200)

 at org.apache.hadoop.util.Shell.run(Shell.java:182)

 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:375)

 at org.apache.hadoop.util.Shell.execCommand(Shell.java:461)

 at org.apache.hadoop.util.Shell.execCommand(Shell.java:444)

 at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:710)

 at
 org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFil

 eSystem.java:443)

 at
 org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSyst

 em.java:418)







 Here is the detail output





 C:\spark-1.1.0\bin   spark-shell

 15/01/29 10:13:13 INFO SecurityManager: Changing view acls to:
 ningjun.wang,

 15/01/29 10:13:13 INFO SecurityManager: Changing modify acls to:
 ningjun.wang,

 15/01/29 10:13:13 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled;

 users with view permissions: Set(ningjun.wang, ); users with modify
 permissions: Set(ningjun.wang, )



 15/01/29 10:13:13 INFO HttpServer: Starting HTTP Server

 15/01/29 10:13:14 INFO Server: jetty-8.y.z-SNAPSHOT

 15/01/29 10:13:14 INFO AbstractConnector: Started
 SocketConnector@0.0.0.0:53692

 15/01/29 10:13:14 INFO Utils: Successfully 

Spark SQL taking long time to print records from a table

2015-02-03 Thread jguliani
I have 3 text files in hdfs which I am reading using spark sql and
registering them as table. After that I am doing almost 5-6 operations -
including joins , group by etc.. And this whole process is taking hardly 6-7
secs. ( Source File size - 3 GB with almost 20 million rows ).
As a final step of my computation, I am expecting only 1 record in my final
rdd - named as acctNPIScr in below code snippet.

My question here is that when I am trying to print this rdd either by
registering as table and printing records from table or by this method -
acctNPIScr.map(t = Score:  + t(1)).collect().foreach(println). It is
taking very long time - almost 1.5 minute to print 1 record. 

Can someone pls help me if I am doing something wrong in printing. What is
the best way to print final result from schemardd. 

.
val acctNPIScr = sqlContext.sql(SELECT party_id,
sum(npi_int)/sum(device_priority_new) as  npi_score FROM AcctNPIScoreTemp
group by party_id ) 
acctNPIScr.registerTempTable(AcctNPIScore)

val endtime = System.currentTimeMillis()
logger.info(Total sql Time : + (endtime - st))   // this time is
hardly 5 secs

println(start printing)

val result = sqlContext.sql(SELECT * FROM
AcctNPIScore).collect().foreach(println)

//acctNPIScr.map(t = Score:  + t(1)).collect().foreach(println)

logger.info(Total printing Time : + (System.currentTimeMillis() -
endtime)) // print one record is taking almost 1.5 minute




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-taking-long-time-to-print-records-from-a-table-tp21493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to define a file filter for file name patterns in Apache Spark Streaming in Java?

2015-02-03 Thread Emre Sevinc
Hello Akhil,

Thank you for taking your time for a detailed answer. I managed to solve it
in a very similar manner.

Kind regards,
Emre Sevinç


On Mon, Feb 2, 2015 at 8:22 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Emre,

 This is how you do that in scala:

 val lines = ssc.fileStream[LongWritable, Text,
 TextInputFormat](/home/akhld/sigmoid, (t: Path) = true, true)

 ​In java you can do something like:

 jssc.ssc().LongWritable, Text,
 SequenceFileInputFormatfileStream(/home/akhld/sigmoid, new
 AbstractFunction1Path, Object() {
 @Override
 public Boolean apply(Path input) {
 //file filtering logic here

 return true;
 }
 }, true, ClassTag$.MODULE$.apply(LongWritable.class),
 ClassTag$.MODULE$.apply(Text.class),
 ClassTag$.MODULE$.apply(SequenceFileInputFormat.class));


 ​


 Thanks
 Best Regards

 On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter
 for file names when creating an InputDStream
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html
 by invoking the fileStream
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
 method. My code is working perfectly fine when I don't use a file filter,
 e.g. by invoking the other fileStream
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
 method (described here
 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29:

 https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29
 ).

 According to the documentation of *fileStream* method, I can pass it

   scala.Function1org.apache.hadoop.fs.Path,Object filter

 But so far, I could not create a fileFilter. My initial attempts have
 been

 1- Tried to implement it as:

 Function1Path, Object fileFilter = new Function1Path, Object() {
 @Override
 public Object apply(Path v1) {
   return true;
 }

 @Override
 public A Function1A, Object compose(Function1A, Path g) {
   return Function1$class.compose(this, g);
 }

 @Override
 public A Function1Path, A andThen(Function1Object, A g) {
   return Function1$class.andThen(this, g);
 }
   };

 But apparently my implementation of andThen is wrong, and I couldn't 
 understand how I should implement it. It complains that the anonymous 
 function:

  is not abstract and does not override abstract method 
 AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in 
 scala.Function1

 2- Tried to implement it as:

 Function1Path, Object fileFilter = new AbstractFunction1Path, Object() {
 @Override
 public Object apply(Path v1) {
   return true;
 }
   };

 This one compiles, but when I run it I get an exception:

 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1
 java.io.NotSerializableException: myModule$1
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
 at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
 at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at 
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
 at 
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
 at 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
 at 
 java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
 at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
 at 
 org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
 

Re: Spark Master Build Failing to run on cluster in standalone ClassNotFoundException: javax.servlet.FilterRegistration

2015-02-03 Thread Sean Owen
Already come up several times today:
https://issues.apache.org/jira/browse/SPARK-5557

On Tue, Feb 3, 2015 at 8:04 AM, Night Wolf nightwolf...@gmail.com wrote:
 Hi,

 I just built Spark 1.3 master using maven via make-distribution.sh;

 ./make-distribution.sh --name mapr3 --skip-java-test --tgz -Pmapr3 -Phive
 -Phive-thriftserver -Phive-0.12.0

 When trying to start the standalone spark master on a cluster I get the
 following stack trace;


 15/02/04 08:53:56 INFO slf4j.Slf4jLogger: Slf4jLogger started
 15/02/04 08:53:56 INFO Remoting: Starting remoting
 15/02/04 08:53:56 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkMaster@hadoop-009:7077]
 15/02/04 08:53:56 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sparkMaster@hadoop-009:7077]
 ...skipping...
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:66)
 at akka.actor.ArgsReflectConstructor.produce(Props.scala:352)
 at akka.actor.Props.newActor(Props.scala:252)
 at akka.actor.ActorCell.newActor(ActorCell.scala:552)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 ... 9 more
 Caused by: java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration
 at
 org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136)
 at
 org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129)
 at
 org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98)
 at
 org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:96)
 at
 org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:87)
 at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67)
 at
 org.apache.spark.deploy.master.ui.MasterWebUI.initialize(MasterWebUI.scala:40)
 at
 org.apache.spark.deploy.master.ui.MasterWebUI.init(MasterWebUI.scala:36)
 at org.apache.spark.deploy.master.Master.init(Master.scala:95)
 ... 18 more
 Caused by: java.lang.ClassNotFoundException:
 javax.servlet.FilterRegistration
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 27 more

 The distro seems about the right size (260MB, so I dont imagine any of the
 libraries are missing. The above command worked on 1.2...

 Any ideas whats going wrong?

 Cheers,
 N

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is LogisticRegressionWithSGD in MLlib scalable?

2015-02-03 Thread Peng Zhang
Hi Everyone,

Is LogisticRegressionWithSGD in MLlib scalable? 

If so, what is the idea behind the scalable implementation?

Thanks in advance,

Peng





-
Peng Zhang
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-LogisticRegressionWithSGD-in-MLlib-scalable-tp21482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to run spark-shell after build

2015-02-03 Thread Jaonary Rabarisoa
Hi all,

I'm trying to run the master version of spark in order to test some alpha
components in ml package.

I follow the build spark documentation and build it with :

$ mvn clean package


The build is successful but when I try to run spark-shell I got the
following errror :


 *Exception in thread main java.lang.NoClassDefFoundError:
javax/servlet/http/HttpServletResponse*
*at org.apache.spark.HttpServer.org
http://org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:74)*
*at
org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)*
*at
org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)*
*at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1732)*
*at
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)*
*at
org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1723)*
*at org.apache.spark.HttpServer.start(HttpServer.scala:61)*
*at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130)*
*at
org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185)*
*at
org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214)*
*at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946)*
*at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)*
*at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)*
*at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)*
*at org.apache.spark.repl.SparkILoop.org
http://org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942)*
*at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039)*
*at org.apache.spark.repl.Main$.main(Main.scala:31)*
*at org.apache.spark.repl.Main.main(Main.scala)*
*at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
*at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
*at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
*at java.lang.reflect.Method.invoke(Method.java:606)*
*at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403)*
*at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)*
*at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)*
*Caused by: java.lang.ClassNotFoundException:
javax.servlet.http.HttpServletResponse*
*at java.net.URLClassLoader$1.run(URLClassLoader.java:366)*
*at java.net.URLClassLoader$1.run(URLClassLoader.java:355)*
*at java.security.AccessController.doPrivileged(Native Method)*
*at java.net.URLClassLoader.findClass(URLClassLoader.java:354)*
*at java.lang.ClassLoader.loadClass(ClassLoader.java:425)*
*at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)*
*at java.lang.ClassLoader.loadClass(ClassLoader.java:358)*
*... 25 more*

What's going wrong ?



Jao


Pig loader in Spark

2015-02-03 Thread Jianshi Huang
Hi,

Anyone has implemented the default Pig Loader in Spark? (loading delimited
text files with .pig_schema)

Thanks,
-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/


connecting spark with ActiveMQ

2015-02-03 Thread Mohit Durgapal
Hi All,

I have a requirement where I need to consume messages from ActiveMQ and do
live stream processing as well as batch processing using Spark. Is there a
spark-plugin or library that can enable this? If not, then do you know any
other way this could be done?


Regards
Mohit


RE: Sort based shuffle not working properly?

2015-02-03 Thread Mohammed Guller
Nitin,
Suing Spark is not going to help. Perhaps you should sue someone else :-) Just 
kidding!

Mohammed


-Original Message-
From: nitinkak001 [mailto:nitinkak...@gmail.com] 
Sent: Tuesday, February 3, 2015 1:57 PM
To: user@spark.apache.org
Subject: Re: Sort based shuffle not working properly?

Just to add, I am suing Spark 1.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread Sean Owen
Hm, I don't think the sort partitioner is going to cause the result to
be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
even guaranteed that the type of c2 has an ordering, right?

On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote:
 I am trying to implement secondary sort in spark as we do in map-reduce.

 Here is my data(tab separated, without c1, c2, c2).
 c1c2 c3
 1   2   4
 1   3   6
 2   4   7
 2   6   8
 3   5   5
 3   1   8
 3   2   0

 To do secondary sort, I create paried RDD as

 /((c1 + ,+ c2), row)/

 and then use a custom partitioner to partition only on c1. I have set
 /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
 key 3 I am expecting to get
 (3, 1)
 (3, 2)
 (3, 5)
 but still getting the original order
 3,5
 3,1
 3,2

 Here is the custom partitioner code:

 /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
   def numPartitions = p
   def getPartition(key: Any) = {
 key.asInstanceOf[String].split(,)(0).toInt
   }

 }/

 and driver code, please tell me what I am doing wrong

 /val conf = new SparkConf().setAppName(MapInheritanceExample)
 conf.set(spark.shuffle.manager, SORT);
 val sc = new SparkContext(conf)
 val pF = sc.textFile(inputFile)

 val log = LogFactory.getLog(MapFunctionTest)
 val partitionedRDD = pF.map { x =

 var arr = x.split(\t);
 (arr(0)+,+arr(1), null)

 }.partitionBy(new StraightPartitioner(10))

 var outputRDD = partitionedRDD.mapPartitions(p = {
   p.map({ case(o, n) = {
o
 }
   })
 })/



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Fail to launch spark-shell on windows 2008 R2

2015-02-03 Thread Wang, Ningjun (LNG-NPV)
Hi Gen

Thanks for your feedback. We do have a business reason to run spark on windows. 
We have an existing application that is built on C# .NET running on windows. We 
are considering adding spark to the application for parallel processing of 
large data. We want spark to run on windows so it integrate with our existing 
app easily.

Has anybody use spark on windows for production system? Is spark reliable on 
windows?

Ningjun

From: gen tang [mailto:gen.tan...@gmail.com]
Sent: Thursday, January 29, 2015 12:53 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: Fail to launch spark-shell on windows 2008 R2

Hi,

Using spark under windows is a really bad idea, because even you solve the 
problems about hadoop, you probably will meet the problem of 
java.net.SocketException. connection reset by peer. It is caused by the fact we 
ask socket port too frequently under windows. In my knowledge, it is really 
difficult to solve. And you will find something really funny: the same code 
sometimes works and sometimes not, even in the shell mode.

And I am sorry but I don't see the interest to run spark under windows and 
moreover using local file system in a business environment. Do you have a 
cluster in windows?

FYI, I have used spark prebuilt on hadoop 1 under windows 7 and there is no 
problem to launch, but have problem of java.net.SocketException. If you are 
using spark prebuilt on hadoop 2, you should consider follow the solution 
provided by https://issues.apache.org/jira/browse/SPARK-2356

Cheers
Gen



On Thu, Jan 29, 2015 at 5:54 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote:
Install virtual box which run Linux? That does not help us. We have business 
reason to run it on Windows operating system, e.g. Windows 2008 R2.

If anybody have done that, please give some advise on what version of spark, 
which version of Hadoop do you built spark against, etc…. Note that we only use 
local file system and do not have any hdfs file system at all. I don’t 
understand why spark generate so many error on Hadoop while we don’t even need 
hdfs.

Ningjun


From: gen tang [mailto:gen.tan...@gmail.commailto:gen.tan...@gmail.com]
Sent: Thursday, January 29, 2015 10:45 AM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Fail to launch spark-shell on windows 2008 R2

Hi,

I tried to use spark under windows once. However the only solution that I found 
is to install virtualbox

Hope this can help you.
Best
Gen


On Thu, Jan 29, 2015 at 4:18 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote:
I deployed spark-1.1.0 on Windows 7 and was albe to launch the spark-shell. I 
then deploy it to windows 2008 R2 and launch the spark-shell, I got the error

java.lang.RuntimeException: Error while running command to get file permissions 
: java.io.IOExceptio
n: Cannot run program ls: CreateProcess error=2, The system cannot find the 
file specified
at java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:200)
at org.apache.hadoop.util.Shell.run(Shell.java:182)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:375)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:461)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:444)
at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:710)
at 
org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFil
eSystem.java:443)
at 
org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSyst
em.java:418)



Here is the detail output


C:\spark-1.1.0\bin   spark-shell
15/01/29 10:13:13 INFO SecurityManager: Changing view acls to: ningjun.wang,
15/01/29 10:13:13 INFO SecurityManager: Changing modify acls to: ningjun.wang,
15/01/29 10:13:13 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled;
users with view permissions: Set(ningjun.wang, ); users with modify 
permissions: Set(ningjun.wang, )

15/01/29 10:13:13 INFO HttpServer: Starting HTTP Server
15/01/29 10:13:14 INFO Server: jetty-8.y.z-SNAPSHOT
15/01/29 10:13:14 INFO AbstractConnector: Started 
SocketConnector@0.0.0.0:53692http://SocketConnector@0.0.0.0:53692
15/01/29 10:13:14 INFO Utils: Successfully started service 'HTTP class server' 
on port 53692.
Failed to created SparkJLineReader: java.lang.NoClassDefFoundError: Could not 
initialize class org.f
usesource.jansi.internal.Kernel32
Falling back to SimpleReader.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
15/01/29 10:13:18 INFO 

Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread David Rosenstrauch
You could also just push the data to Amazon S3, which would un-link the 
size of the cluster needed to process the data from the size of the data.


DR

On 02/03/2015 11:43 AM, Joe Wass wrote:

I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

If I want to process 800 GB of data (assuming I can't split the jobs up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has already been
covered.

Joe




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark (SQL) as OLAP engine

2015-02-03 Thread Adamantios Corais
Hi,

After some research I have decided that Spark (SQL) would be ideal for
building an OLAP engine. My goal is to push aggregated data (to Cassandra
or other low-latency data storage) and then be able to project the results
on a web page (web service). New data will be added (aggregated) once a
day, only. On the other hand, the web service must be able to run some
fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the
results with D3.js. Note that I can already achieve similar speeds while in
REPL mode by caching the data. Therefore, I believe that my problem must be
re-phrased as follows: How can I automatically cache the data once a day
and make them available on a web service that is capable of running any
Spark or Spark (SQL)  statement in order to plot the results with D3.js?

Note that I have already some experience in Spark (+Spark SQL) as well as
D3.js but not at all with OLAP engines (at least in their traditional form).

Any ideas or suggestions?


*// Adamantios*


Re: Spark on Yarn: java.lang.IllegalArgumentException: Invalid rule

2015-02-03 Thread maven

The version I'm using was already pre-built for Hadoop 2.3. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-java-lang-IllegalArgumentException-Invalid-rule-tp21382p21485.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

If I want to process 800 GB of data (assuming I can't split the jobs up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has already been
covered.

Joe


Supported Notebooks (and other viz tools) for Spark 0.9.1?

2015-02-03 Thread Adamantios Corais
Hi,

I am using Spark 0.9.1 and I am looking for a proper viz tools that
supports that specific version. As far as I have seen all relevant tools
(e.g. spark-notebook, zeppelin-project etc) only support 1.1 or 1.2; no
mentions about older versions of Spark. Any ideas or suggestions?


*// Adamantios*


Re: Writing RDD to a csv file

2015-02-03 Thread Gerard Maas
this is more of a scala question, so probably next time you'd like to
address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala

val optArrStr:Option[Array[String]] = ???
optArrStr.map(arr = arr.mkString(,)).getOrElse()  // empty string or
whatever default value you have for this.

kr, Gerard.

On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com wrote:

 I have a RDD which is of type

 org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))]

 I want to write it as a csv file.

 Please suggest how this can be done.

 myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , +
 line._2._2.mkString(','))).saveAsTextFile(hdfs://...)

 Doing mkString on line._2._1 works but does not work for the Option type.

 Please suggest how this can be done.


 Thanks
 Kundan





Writing RDD to a csv file

2015-02-03 Thread kundan kumar
I have a RDD which is of type

org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))]

I want to write it as a csv file.

Please suggest how this can be done.

myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , +
line._2._2.mkString(','))).saveAsTextFile(hdfs://...)

Doing mkString on line._2._1 works but does not work for the Option type.

Please suggest how this can be done.


Thanks
Kundan


Re: Writing RDD to a csv file

2015-02-03 Thread kundan kumar
Thanks Gerard !!

This is working.

On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas gerard.m...@gmail.com wrote:

 this is more of a scala question, so probably next time you'd like to
 address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala

 val optArrStr:Option[Array[String]] = ???
 optArrStr.map(arr = arr.mkString(,)).getOrElse()  // empty string or
 whatever default value you have for this.

 kr, Gerard.

 On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com
 wrote:

 I have a RDD which is of type

 org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))]

 I want to write it as a csv file.

 Please suggest how this can be done.

 myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , +
 line._2._2.mkString(','))).saveAsTextFile(hdfs://...)

 Doing mkString on line._2._1 works but does not work for the Option type.

 Please suggest how this can be done.


 Thanks
 Kundan






Re: Supported Notebooks (and other viz tools) for Spark 0.9.1?

2015-02-03 Thread andy petrella
Hello Adamantios,

Thanks for the poke and the interest.
Actually, you're the second asking about backporting it. Yesterday (late),
I created a branch for it... and the simple local spark test worked! \o/.
However, it'll be the 'old' UI :-/. Since I didn't ported the code using
play 2.2.6 to the new ui.
FYI: play 2.2.6 uses a compliant akka version, that's why I mention it.

It was too late for a push :-D, so I'll commit and push this evening.
At least, you can try it tomorrow.

I shall be on gitter this evening as well if there are questions:
https://gitter.im/andypetrella/spark-notebook

Cheers,
andy

On Tue Feb 03 2015 at 2:05:35 PM Adamantios Corais 
adamantios.cor...@gmail.com wrote:

 Hi,

 I am using Spark 0.9.1 and I am looking for a proper viz tools that
 supports that specific version. As far as I have seen all relevant tools
 (e.g. spark-notebook, zeppelin-project etc) only support 1.1 or 1.2; no
 mentions about older versions of Spark. Any ideas or suggestions?


 *// Adamantios*





Spark Master Build Failing to run on cluster in standalone ClassNotFoundException: javax.servlet.FilterRegistration

2015-02-03 Thread Night Wolf
Hi,

I just built Spark 1.3 master using maven via make-distribution.sh;

./make-distribution.sh --name mapr3 --skip-java-test --tgz -Pmapr3 -Phive
-Phive-thriftserver -Phive-0.12.0

When trying to start the standalone spark master on a cluster I get the
following stack trace;


15/02/04 08:53:56 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/02/04 08:53:56 INFO Remoting: Starting remoting
15/02/04 08:53:56 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkMaster@hadoop-009:7077]
15/02/04 08:53:56 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkMaster@hadoop-009:7077]
...skipping...
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at akka.util.Reflect$.instantiate(Reflect.scala:66)
at akka.actor.ArgsReflectConstructor.produce(Props.scala:352)
at akka.actor.Props.newActor(Props.scala:252)
at akka.actor.ActorCell.newActor(ActorCell.scala:552)
at akka.actor.ActorCell.create(ActorCell.scala:578)
... 9 more
Caused by: java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration
at
org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136)
at
org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129)
at
org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98)
at
org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:96)
at
org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:87)
at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67)
at
org.apache.spark.deploy.master.ui.MasterWebUI.initialize(MasterWebUI.scala:40)
at
org.apache.spark.deploy.master.ui.MasterWebUI.init(MasterWebUI.scala:36)
at org.apache.spark.deploy.master.Master.init(Master.scala:95)
... 18 more
Caused by: java.lang.ClassNotFoundException:
javax.servlet.FilterRegistration
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 27 more

The distro seems about the right size (260MB, so I dont imagine any of the
libraries are missing. The above command worked on 1.2...

Any ideas whats going wrong?

Cheers,
N


Re: Unable to run spark-shell after build

2015-02-03 Thread Sean Owen
Yes, I see this too. I think the Jetty shading still needs a tweak.
It's not finding the servlet API classes. Let's converge on SPARK-5557
to discuss.

On Tue, Feb 3, 2015 at 2:04 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 Hi all,

 I'm trying to run the master version of spark in order to test some alpha
 components in ml package.

 I follow the build spark documentation and build it with :

 $ mvn clean package


 The build is successful but when I try to run spark-shell I got the
 following errror :


  Exception in thread main java.lang.NoClassDefFoundError:
 javax/servlet/http/HttpServletResponse
 at
 org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:74)
 at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)
 at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1732)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1723)
 at org.apache.spark.HttpServer.start(HttpServer.scala:61)
 at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130)
 at
 org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185)
 at
 org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)
 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at
 org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 javax.servlet.http.HttpServletResponse
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 25 more

 What's going wrong ?



 Jao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: GraphX: ShortestPaths does not terminate on a grid graph

2015-02-03 Thread Jay Hutfles
I think this is a separate issue with how the EdgeRDDImpl partitions
edges.  If you can merge this change in and rebuild, it should work:

   https://github.com/apache/spark/pull/4136/files

If you can't, I just called the Graph.partitonBy() method right after
construction my graph but before performing any operations on it.  That
way, the EdgeRDDImpl class doesn't have to use the default partitioner.

Hope this helps!
   Jay

On Tue Feb 03 2015 at 12:35:14 AM NicolasC nicolas.ch...@inria.fr wrote:

 On 01/29/2015 08:31 PM, Ankur Dave wrote:
  Thanks for the reminder. I just created a PR:
  https://github.com/apache/spark/pull/4273
  Ankur
 

 Hello,

 Thanks for the patch. I applied it on Pregel.scala (in Spark 1.2.0
 sources) and rebuilt
 Spark. During execution, at the 25th iteration of Pregel, checkpointing is
 done and then
 it throws the following exception :

 Exception in thread main org.apache.spark.SparkException: Checkpoint
 RDD CheckpointRDD[521] at reduce at VertexRDDImpl.scala:80(0) has
 different number of partitions than original RDD VertexRDD
 ZippedPartitionsRDD2[518] at zipPartitions at VertexRDDImpl.scala:170(2)
 at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(
 RDDCheckpointData.scala:98)
 at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1279)
 at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(
 RDD.scala:1281)
 at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(
 RDD.scala:1281)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1281)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1285)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1351)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:867)
 at org.apache.spark.graphx.impl.VertexRDDImpl.count(
 VertexRDDImpl.scala:80)
 at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:155)
 at org.apache.spark.graphx.lib.ShortestPaths$.run(
 ShortestPaths.scala:69)
 

 Pregel.scala:155 is the following line in the pregel loop:

activeMessages = messages.count()


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
The data is coming from S3 in the first place, and the results will be
uploaded back there. But even in the same availability zone, fetching 170
GB (that's gzipped) is slow. From what I understand of the pipelines,
multiple transforms on the same RDD might involve re-reading the input,
which very quickly add up in comparison to having the data locally. Unless
I persisted the data (which I am in fact doing) but that would involve
storing approximately the same amount of data in HDFS, which wouldn't fit.

Also, I understood that S3 was unsuitable for practical? See Why you
cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
though, that would make things a lot easier.

[0] http://wiki.apache.org/hadoop/AmazonS3



On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote:

 You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

 I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread David Rosenstrauch
We use S3 as a main storage for all our input data and our generated 
(output) data.  (10's of terabytes of data daily.)  We read gzipped data 
directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as 
long as you parallelize the work well by distributing the processing 
across enough machines.  (About 100 nodes, in our case.)


The way we generally operate is re: storage is:  read input directly 
from s3, write output from Hadoop/Spark jobs to HDFS, then after job is 
complete distcp the relevant output from HDFS back to S3.  Works for us 
... YMMV.  :-)


HTH,

DR

On 02/03/2015 12:32 PM, Joe Wass wrote:

The data is coming from S3 in the first place, and the results will be
uploaded back there. But even in the same availability zone, fetching 170
GB (that's gzipped) is slow. From what I understand of the pipelines,
multiple transforms on the same RDD might involve re-reading the input,
which very quickly add up in comparison to having the data locally. Unless
I persisted the data (which I am in fact doing) but that would involve
storing approximately the same amount of data in HDFS, which wouldn't fit.

Also, I understood that S3 was unsuitable for practical? See Why you
cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
though, that would make things a lot easier.

[0] http://wiki.apache.org/hadoop/AmazonS3



On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote:


You could also just push the data to Amazon S3, which would un-link the
size of the cluster needed to process the data from the size of the data.

DR


On 02/03/2015 11:43 AM, Joe Wass wrote:


I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

If I want to process 800 GB of data (assuming I can't split the jobs up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has already been
covered.

Joe




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Ted Yu
Using s3a protocol (introduced in hadoop 2.6.0) would be faster compared to
s3.

The upcoming hadoop 2.7.0 contains some bug fixes for s3a.

FYI

On Tue, Feb 3, 2015 at 9:48 AM, David Rosenstrauch dar...@darose.net
wrote:

 We use S3 as a main storage for all our input data and our generated
 (output) data.  (10's of terabytes of data daily.)  We read gzipped data
 directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long
 as you parallelize the work well by distributing the processing across
 enough machines.  (About 100 nodes, in our case.)

 The way we generally operate is re: storage is:  read input directly from
 s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete
 distcp the relevant output from HDFS back to S3.  Works for us ... YMMV.
 :-)

 HTH,

 DR


 On 02/03/2015 12:32 PM, Joe Wass wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

 Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

 [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
 wrote:

  You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

  I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
 disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs
 up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Error in saving schemaRDD with Decimal as Parquet

2015-02-03 Thread Manoj Samel
Hi,

Any thoughts ?

Thanks,

On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Spark 1.2

 SchemaRDD has schema with decimal columns created like

 x1 = new StructField(a, DecimalType(14,4), true)

 x2 = new StructField(b, DecimalType(14,4), true)

 Registering as SQL Temp table and doing SQL queries on these columns ,
 including SUM etc. works fine, so the schema Decimal does not seems to be
 issue

 When doing saveAsParquetFile on the RDD, it gives following error. Not
 sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems
 to see it as scala.math.BigDecimal

 java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to
 org.apache.spark.sql.catalyst.types.decimal.Decimal

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(
 ParquetTableSupport.scala:359)

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
 ParquetTableSupport.scala:328)

 at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(
 ParquetTableSupport.scala:314)

 at parquet.hadoop.InternalParquetRecordWriter.write(
 InternalParquetRecordWriter.java:120)

 at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)

 at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)

 at org.apache.spark.sql.parquet.InsertIntoParquetTable.org
 $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(
 ParquetTableOperations.scala:308)

 at
 org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
 ParquetTableOperations.scala:325)

 at
 org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(
 ParquetTableOperations.scala:325)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:56)

 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:744)






Kyro serialization and OOM

2015-02-03 Thread Joe Wass
I have about 500 MB of data and I'm trying to process it on a single
`local` instance. I'm getting an Out of Memory exception. Stack trace at
the end.

Spark 1.1.1
My JVM has --Xmx2g

spark.driver.memory = 1000M
spark.executor.memory = 1000M
spark.kryoserializer.buffer.mb = 256
spark.kryoserializer.buffer.max.mb = 256

The objects I'm dealing with are well constrained. Each can be no more than
500 bytes at the very most. I ran into problems with the kryo buffer being
too small but I think that 256 MB should do the job. The docs say This
must be larger than any object you attempt to serialize. No danger of that.

My input is a single file (on average each line is 500 bytes). I'm
performing various filter, map, flatMap, groupByKey and reduceByKey. The
only 'actions' I'm performing are foreach, which inserts values into a
database.

On input, I'm parsing the lines and then persisting with DISK_ONLY.

I'm foreaching over the keys and then foreaching over the values of key
value RDDs. The docs say that groupByKey returns (K, IterableV). So the
values (which can be large) shouldn't be serialized as a single list.

So I don't think I should be loading anything larger than 256 MB at once.

My code works for small sample toy data and I'm trying it out on a bit
more. As I understand it, the way that Spark partitions data means that it
(in most cases) any job that will run on a cluster will also run on a
single instance, given enough time.

I think I've given enough memory to cover my serialization needs as I
understand them. Have I misunderstood?

Joe

Stack trace:

INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
stage 30.0 (TID 116, localhost, PROCESS_LOCAL, 993 bytes)
INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 30.0
(TID 116)

...

ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage
30.0 (TID 116)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.init(Output.java:35)
at
org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58)
at
org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151)
at
org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

...

WARN  org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage
30.0 (TID 116, localhost): java.lang.OutOfMemoryError: Java heap space
com.esotericsoftware.kryo.io.Output.init(Output.java:35)

org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58)

org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151)

org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151)

org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Re: Spark (SQL) as OLAP engine

2015-02-03 Thread Sean McNamara
We have gone down a similar path at Webtrends, Spark has worked amazingly well 
for us in this use case.  Our solution goes from REST, directly into spark, and 
back out to the UI instantly.

Here is the resulting product in case you are curious (and please pardon the 
self promotion): 
https://www.webtrends.com/support-training/training/explore-onboarding/


 How can I automatically cache the data once a day...

If you are not memory-bounded you could easily cache the daily results for some 
span of time and re-union them together each time you add new data.  You would 
service queries off the unioned RDD.


 ... and make them available on a web service

From the unioned RDD you could always step into spark SQL at that point.  Or 
you could use a simple scatter/gather pattern for this.  As with all things 
Spark, this is super easy to do: just use aggregate()()!


Cheers,

Sean


On Feb 3, 2015, at 9:59 AM, Adamantios Corais 
adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com wrote:

Hi,

After some research I have decided that Spark (SQL) would be ideal for building 
an OLAP engine. My goal is to push aggregated data (to Cassandra or other 
low-latency data storage) and then be able to project the results on a web page 
(web service). New data will be added (aggregated) once a day, only. On the 
other hand, the web service must be able to run some fixed(?) queries (either 
on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I 
can already achieve similar speeds while in REPL mode by caching the data. 
Therefore, I believe that my problem must be re-phrased as follows: How can I 
automatically cache the data once a day and make them available on a web 
service that is capable of running any Spark or Spark (SQL)  statement in order 
to plot the results with D3.js?

Note that I have already some experience in Spark (+Spark SQL) as well as D3.js 
but not at all with OLAP engines (at least in their traditional form).

Any ideas or suggestions?

// Adamantios





Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
Thanks very much, that's good to know, I'll certainly give it a look.

Can you give me a hint about you unzip your input files on the fly? I
thought that it wasn't possible to parallelize zipped inputs unless they
were unzipped before passing to Spark?

Joe

On 3 February 2015 at 17:48, David Rosenstrauch dar...@darose.net wrote:

 We use S3 as a main storage for all our input data and our generated
 (output) data.  (10's of terabytes of data daily.)  We read gzipped data
 directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long
 as you parallelize the work well by distributing the processing across
 enough machines.  (About 100 nodes, in our case.)

 The way we generally operate is re: storage is:  read input directly from
 s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete
 distcp the relevant output from HDFS back to S3.  Works for us ... YMMV.
 :-)

 HTH,

 DR


 On 02/03/2015 12:32 PM, Joe Wass wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

 Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

 [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
 wrote:

  You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

  I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
 disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs
 up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




GraphX pregel: getting the current iteration number

2015-02-03 Thread Matthew Cornell
Hi Folks,

I'm new to GraphX and Scala and my sendMsg function needs to index into an 
input list to my algorithm based on the pregel()() iteration number, but I 
don't see a way to access that. I see in 
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
 that it's just an index variable i in a while loop, but is there a way for 
sendMsg to access within the loop's scope? I don't think so, so barring that, 
given Scala's functional stateless nature, what other approaches would you take 
to do this? I'm considering a closure, but a var that gets updated by all the 
sendMsgs seems a recipe for trouble.

Thank you,

matt
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark (SQL) as OLAP engine

2015-02-03 Thread Jonathan Haddad
Write out the rdd to a cassandra table.  The datastax driver provides
saveToCassandra() for this purpose.

On Tue Feb 03 2015 at 8:59:15 AM Adamantios Corais 
adamantios.cor...@gmail.com wrote:

 Hi,

 After some research I have decided that Spark (SQL) would be ideal for
 building an OLAP engine. My goal is to push aggregated data (to Cassandra
 or other low-latency data storage) and then be able to project the results
 on a web page (web service). New data will be added (aggregated) once a
 day, only. On the other hand, the web service must be able to run some
 fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the
 results with D3.js. Note that I can already achieve similar speeds while in
 REPL mode by caching the data. Therefore, I believe that my problem must be
 re-phrased as follows: How can I automatically cache the data once a day
 and make them available on a web service that is capable of running any
 Spark or Spark (SQL)  statement in order to plot the results with D3.js?

 Note that I have already some experience in Spark (+Spark SQL) as well as
 D3.js but not at all with OLAP engines (at least in their traditional form).

 Any ideas or suggestions?


 *// Adamantios*





Re: Setting maxPrintString in Spark Repl to view SQL query plans

2015-02-03 Thread Michael Armbrust
I'll add i usually just do

println(query.queryExecution)

On Tue, Feb 3, 2015 at 11:34 AM, Michael Armbrust mich...@databricks.com
wrote:

 You should be able to do something like:

 sbt -Dscala.repl.maxprintstring=64000 hive/console

 Here's an overview of catalyst:
 https://docs.google.com/a/databricks.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit#heading=h.vp2tej73rtm2

 On Tue, Feb 3, 2015 at 1:37 AM, Mick Davies michael.belldav...@gmail.com
 wrote:

 Hi,

 I want to increase the maxPrintString the Spark repl to look at SQL query
 plans, as they are truncated by default at 800 chars, but don't know how
 to
 set this. You don't seem to be able to do it in the same way as you would
 with with Scala repl.


 Anyone know how to set this?

 Also anyone know a good document describing the interpretation of Spark
 SQL
 query plans?


 Thanks Mick



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Setting-maxPrintString-in-Spark-Repl-to-view-SQL-query-plans-tp21476.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Supported Notebooks (and other viz tools) for Spark 0.9.1?

2015-02-03 Thread andy petrella
Adamantios,

As said, I backported it to 0.9.x and now it's pushed on this branch:
https://github.com/andypetrella/spark-notebook/tree/spark-0.9.x.

I didn't created dist atm, because I'd prefer to do it only if necessary
:-).
So, if you want to try it out, just clone the repo, checked out in this
branch and launch `sbt run`.

HTH,
andy

On Tue Feb 03 2015 at 2:45:43 PM andy petrella andy.petre...@gmail.com
wrote:

 Hello Adamantios,

 Thanks for the poke and the interest.
 Actually, you're the second asking about backporting it. Yesterday (late),
 I created a branch for it... and the simple local spark test worked! \o/.
 However, it'll be the 'old' UI :-/. Since I didn't ported the code using
 play 2.2.6 to the new ui.
 FYI: play 2.2.6 uses a compliant akka version, that's why I mention it.

 It was too late for a push :-D, so I'll commit and push this evening.
 At least, you can try it tomorrow.

 I shall be on gitter this evening as well if there are questions:
 https://gitter.im/andypetrella/spark-notebook

 Cheers,
 andy

 On Tue Feb 03 2015 at 2:05:35 PM Adamantios Corais 
 adamantios.cor...@gmail.com wrote:

 Hi,

 I am using Spark 0.9.1 and I am looking for a proper viz tools that
 supports that specific version. As far as I have seen all relevant tools
 (e.g. spark-notebook, zeppelin-project etc) only support 1.1 or 1.2; no
 mentions about older versions of Spark. Any ideas or suggestions?


 *// Adamantios*





Re: GraphX pregel: getting the current iteration number

2015-02-03 Thread Daniil Osipov
I don't think its possible to access. What I've done before is send the
current or next iteration index with the message, where the message is a
case class.

HTH
Dan

On Tue, Feb 3, 2015 at 10:20 AM, Matthew Cornell corn...@cs.umass.edu
wrote:

 Hi Folks,

 I'm new to GraphX and Scala and my sendMsg function needs to index into an
 input list to my algorithm based on the pregel()() iteration number, but I
 don't see a way to access that. I see in
 https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
 that it's just an index variable i in a while loop, but is there a way
 for sendMsg to access within the loop's scope? I don't think so, so barring
 that, given Scala's functional stateless nature, what other approaches
 would you take to do this? I'm considering a closure, but a var that gets
 updated by all the sendMsgs seems a recipe for trouble.

 Thank you,

 matt
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Setting maxPrintString in Spark Repl to view SQL query plans

2015-02-03 Thread Michael Armbrust
You should be able to do something like:

sbt -Dscala.repl.maxprintstring=64000 hive/console

Here's an overview of catalyst:
https://docs.google.com/a/databricks.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit#heading=h.vp2tej73rtm2

On Tue, Feb 3, 2015 at 1:37 AM, Mick Davies michael.belldav...@gmail.com
wrote:

 Hi,

 I want to increase the maxPrintString the Spark repl to look at SQL query
 plans, as they are truncated by default at 800 chars, but don't know how to
 set this. You don't seem to be able to do it in the same way as you would
 with with Scala repl.


 Anyone know how to set this?

 Also anyone know a good document describing the interpretation of Spark SQL
 query plans?


 Thanks Mick



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Setting-maxPrintString-in-Spark-Repl-to-view-SQL-query-plans-tp21476.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread David Rosenstrauch
Not all of our input files are zipped.  The ones that are obviously are 
not parallelized - they're just processed by a single task.  Not a big 
issue for us, though, as the those zipped files aren't too big.


DR

On 02/03/2015 01:08 PM, Joe Wass wrote:

Thanks very much, that's good to know, I'll certainly give it a look.

Can you give me a hint about you unzip your input files on the fly? I
thought that it wasn't possible to parallelize zipped inputs unless they
were unzipped before passing to Spark?

Joe

On 3 February 2015 at 17:48, David Rosenstrauch dar...@darose.net wrote:


We use S3 as a main storage for all our input data and our generated
(output) data.  (10's of terabytes of data daily.)  We read gzipped data
directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long
as you parallelize the work well by distributing the processing across
enough machines.  (About 100 nodes, in our case.)

The way we generally operate is re: storage is:  read input directly from
s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete
distcp the relevant output from HDFS back to S3.  Works for us ... YMMV.
:-)

HTH,

DR


On 02/03/2015 12:32 PM, Joe Wass wrote:


The data is coming from S3 in the first place, and the results will be
uploaded back there. But even in the same availability zone, fetching 170
GB (that's gzipped) is slow. From what I understand of the pipelines,
multiple transforms on the same RDD might involve re-reading the input,
which very quickly add up in comparison to having the data locally. Unless
I persisted the data (which I am in fact doing) but that would involve
storing approximately the same amount of data in HDFS, which wouldn't fit.

Also, I understood that S3 was unsuitable for practical? See Why you
cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
though, that would make things a lot easier.

[0] http://wiki.apache.org/hadoop/AmazonS3



On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
wrote:

  You could also just push the data to Amazon S3, which would un-link the

size of the cluster needed to process the data from the size of the data.

DR


On 02/03/2015 11:43 AM, Joe Wass wrote:

  I want to process about 800 GB of data on an Amazon EC2 cluster. So, I

need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
disk.
Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

If I want to process 800 GB of data (assuming I can't split the jobs
up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has already been
covered.

Joe



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
This is an exerpt from the Design document of the implementation of Sort
based shuffle.. I am thinking I might be wrong in my perception of sort
based shuffle. Dont  completely understand it though.

*Motivation*
A sort­based shuffle can be more scalable than Spark’s current hash­based
one because it doesn’t require writing a separate file for each reduce task
from each mapper. Instead, we write a single sorted file and serve ranges
of it to different reducers. In jobs with a lot of reduce tasks (say
10,000+), this saves significant memory for compression and serialization
buffers and results in more sequential disk I/O.

*Implementation*
To perform a sort­based shuffle, each map task will produce one or more
output files sorted by a key’s partition ID, then merge­sort them to yield
a single output file. Because it’s only necessary to group the keys
together into partitions, we won’t bother to also sort them within each
partition

On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak nitinkak...@gmail.com wrote:

 I thought thats what sort based shuffled did, sort the keys going to the
 same partition.

 I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
 ordering of c2 type is the problem here.

 On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com
 wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
 For the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
 {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Sort-basedshuffledesign.pdf
Description: Adobe PDF document

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: 2GB limit for partitions?

2015-02-03 Thread Aaron Davidson
To be clear, there is no distinction between partitions and blocks for RDD
caching (each RDD partition corresponds to 1 cache block). The distinction
is important for shuffling, where by definition N partitions are shuffled
into M partitions, creating N*M intermediate blocks. Each of these blocks
must also be smaller than 2GB, but due to their number, this is an atypical
scenario.

If you do

sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.repartition(1000).count()

you should not see this error, as the 5GB initial partition was split into
1000 partitions of 5MB each, during a shuffle.

On the other hand,

sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same
reason.

On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use 

Re: 2GB limit for partitions?

2015-02-03 Thread Michael Albert
Thank you!
This is very helpful.
-Mike

  From: Aaron Davidson ilike...@gmail.com
 To: Imran Rashid iras...@cloudera.com 
Cc: Michael Albert m_albert...@yahoo.com; Sean Owen so...@cloudera.com; 
user@spark.apache.org user@spark.apache.org 
 Sent: Tuesday, February 3, 2015 6:13 PM
 Subject: Re: 2GB limit for partitions?
   
To be clear, there is no distinction between partitions and blocks for RDD 
caching (each RDD partition corresponds to 1 cache block). The distinction is 
important for shuffling, where by definition N partitions are shuffled into M 
partitions, creating N*M intermediate blocks. Each of these blocks must also be 
smaller than 2GB, but due to their number, this is an atypical scenario.
If you do
sc.parallelize(1 to 1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.repartition(1000).count()
you should not see this error, as the 5GB initial partition was split into 1000 
partitions of 5MB each, during a shuffle.
On the other hand,
sc.parallelize(1 to 1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same reason.


On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

Michael,
you are right, there is definitely some limit at 2GB.  Here is a trivial 
example to demonstrate it:
import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 
1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count()
It gives the same error you are observing.  I was under the same impression as 
Sean about the limits only being on blocks, not partitions -- but clearly that 
isn't the case here.
I don't know the whole story yet, but I just wanted to at least let you know 
you aren't crazy :)At the very least this suggests that you might need to make 
smaller partitions for now.
Imran

On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid 
wrote:

Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a partitionBy() was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt 

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Thanks for the explanations, makes sense.  For the record looks like this
was worked on a while back (and maybe the work is even close to a solution?)

https://issues.apache.org/jira/browse/SPARK-1476

and perhaps an independent solution was worked on here?

https://issues.apache.org/jira/browse/SPARK-1391


On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

 cc dev list


 How are you saving the data? There are two relevant 2GB limits:

 1. Caching

 2. Shuffle


 For caching, a partition is turned into a single block.

 For shuffle, each map partition is partitioned into R blocks, where R =
 number of reduce tasks. It is unlikely a shuffle block  2G, although it
 can still happen.

 I think the 2nd problem is easier to fix than the 1st, because we can
 handle that in the network transport layer. It'd require us to divide the
 transfer of a very large block into multiple smaller blocks.



 On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 

Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
I thought thats what sort based shuffled did, sort the keys going to the
same partition.

I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
ordering of c2 type is the problem here.

On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For
 the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: 2GB limit for partitions?

2015-02-03 Thread Reynold Xin
cc dev list


How are you saving the data? There are two relevant 2GB limits:

1. Caching

2. Shuffle


For caching, a partition is turned into a single block.

For shuffle, each map partition is partitioned into R blocks, where R =
number of reduce tasks. It is unlikely a shuffle block  2G, although it
can still happen.

I think the 2nd problem is easier to fix than the 1st, because we can
handle that in the network transport layer. It'd require us to divide the
transfer of a very large block into multiple smaller blocks.



On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use the body of mapPartition to open a file
 and
  save the data.
 
  My apologies, this is actually embedded in a bigger mess, so I won't
 post
  it.
 
  However, I get errors telling me that there is an
 IllegalArgumentException:
  Size exceeds Inter.MAX_VALUE, with 

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Michael,

you are right, there is definitely some limit at 2GB.  Here is a trivial
example to demonstrate it:

import org.apache.spark.storage.StorageLevel
val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
d.count()

It gives the same error you are observing.  I was under the same impression
as Sean about the limits only being on blocks, not partitions -- but
clearly that isn't the case here.

I don't know the whole story yet, but I just wanted to at least let you
know you aren't crazy :)
At the very least this suggests that you might need to make smaller
partitions for now.

Imran


On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use the body of mapPartition to open a file and
  save the data.
 
  My apologies, this is actually embedded in a bigger mess, so I won't post
  it.
 
  However, I get errors telling me that there is an
 IllegalArgumentException:
  Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
  top of the stack.  This leads me to think that I have hit the limit or
  partition and/or block size.
 
  Perhaps this is not a good way to do it?
 
  I suppose I could run 1,000 passes over the data, each time collecting
 the
  output for one of my 1,000 final files, but that seems likely to be
  painfully slow to run.
 
  Am I missing something?
 
  Admittedly, this is an odd use case
 
  Thanks!
 
  Sincerely,
   Mike Albert


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, 

Setting maxPrintString in Spark Repl to view SQL query plans

2015-02-03 Thread Mick Davies
Hi, 

I want to increase the maxPrintString the Spark repl to look at SQL query
plans, as they are truncated by default at 800 chars, but don't know how to
set this. You don't seem to be able to do it in the same way as you would
with with Scala repl.


Anyone know how to set this?

Also anyone know a good document describing the interpretation of Spark SQL
query plans?


Thanks Mick



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-maxPrintString-in-Spark-Repl-to-view-SQL-query-plans-tp21476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming - tracking/deleting processed files

2015-02-03 Thread Prannoy
Hi,

To keep processing the older file also you can use fileStream instead of
textFileStream. It has a parameter to specify to look for already present
files.

For deleting the processed files one way is to get the list of all files in
the dStream. This can be done by using the foreachRDD api of the dStream
received from the fileStream(or textFileStream).

Suppose the dStream is

JavaDStreamString jpDstream = ssc.textFileStream(path/to/your/folder/);

jpDstream.print();

 jpDstream.foreachRDD(

 new FunctionJavaRDDString, Void(){

  @Override

  public Void call(JavaRDDString arg0) throws Exception {

  getContentHigh(arg0,ssc);

  return null;

  }

 }

 );

 public static U void getContentHigh(JavaRDDString ds,
JavaStreamingContext ssc){

int lenPartition = ds.rdd().partitions().length; // this gives the number
of files the stream picked

for(int i=0;ilenPartition;i++) {

 UnionPartition upp = (UnionPartition) listPartitions[i];

   NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();

String fPath = npp.serializableHadoopSplit().value().toString();

String[] nT =  tmpName.split(:);

String name = nT[0]; // name is the path of the file picked for processing.
the processing logic can be inside this loop. once //done you can delete
the file using the path in the variable name


}

}


Thanks.

On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] 
ml-node+s1001560n21444...@n3.nabble.com wrote:

 We are running a Spark streaming job that retrieves files from a directory
 (using textFileStream).
 One concern we are having is the case where the job is down but files are
 still being added to the directory.
 Once the job starts up again, those files are not being picked up (since
 they are not new or changed while the job is running) but we would like
 them to be processed.
 Is there a solution for that? Is there a way to keep track what files have
 been processed and can we force older files to be picked up? Is there a
 way to delete the processed files?

 Thanks!
 Markus

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: 2GB limit for partitions?

2015-02-03 Thread Michael Albert
Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a partitionBy() was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt I am working on is something like this.
 1) Do a map whose output key indicates which of the 1,000 files it will go
 into and whose value is what I will want to stick into the file.
 2) Partition the data and use the body of mapPartition to open a file and
 save the data.

 My apologies, this is actually embedded in a bigger mess, so I won't post
 it.

 However, I get errors telling me that there is an IllegalArgumentException:
 Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
 top of the stack.  This leads me to think that I have hit the limit or
 partition and/or block size.

 Perhaps this is not a good way to do it?

 I suppose I could run 1,000 passes over the data, each time collecting the
 output for one of my 1,000 final files, but that seems likely to be
 painfully slow to run.

 Am I missing something?

 Admittedly, this is an odd use case

 Thanks!

 Sincerely,
  Mike Albert

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Re: Spark Shell Timeouts

2015-02-03 Thread amoners
I am not sure that this way can help you. There is my situation that I can
not see any input in terminal after some work gets done via spark-shell, I
used to run a command  stty echo  , and It fixed.

Best,
Amoners



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Shell-Timeouts-tp21438p21479.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LeaseExpiredException while writing schemardd to hdfs

2015-02-03 Thread Hafiz Mujadid
I want to write whole schemardd to single in hdfs but facing following
exception

rg.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
No lease on /test/data/data1.csv (inode 402042): File does not exist. Holder
DFSClient_NONMAPREDUCE_-564238432_57 does not have any open files 

here is my code 
rdd.foreachPartition( iterator = {
  var output = new Path( outputpath )
  val fs = FileSystem.get( new Configuration() )
  var writer : BufferedWriter = null
  writer = new BufferedWriter( new OutputStreamWriter(  fs.create(
output ) ) ) 
  var line = new StringBuilder
  iterator.foreach( row = {
row.foreach( column = {
line.append( column.toString + splitter )
} )
writer.write( line.toString.dropRight( 1 ) )
writer.newLine()
line.clear
} )
writer.close()
} )

I think problem is that I am making writer for each partition and multiple
writer are executing in parallel so when they try to write to same file then
this problem appears. 
When I avoid this approach then I face task not serializable exception

Any suggest to handle this problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/LeaseExpiredException-while-writing-schemardd-to-hdfs-tp21477.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing RDD to a csv file

2015-02-03 Thread Charles Feduke
In case anyone needs to merge all of their part-n files (small result
set only) into a single *.csv file or needs to generically flatten case
classes, tuples, etc., into comma separated values:

http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/

On Tue Feb 03 2015 at 8:23:59 AM kundan kumar iitr.kun...@gmail.com wrote:

 Thanks Gerard !!

 This is working.

 On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas gerard.m...@gmail.com wrote:

 this is more of a scala question, so probably next time you'd like to
 address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala

 val optArrStr:Option[Array[String]] = ???
 optArrStr.map(arr = arr.mkString(,)).getOrElse()  // empty string or
 whatever default value you have for this.

 kr, Gerard.

 On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com
 wrote:

 I have a RDD which is of type

 org.apache.spark.rdd.RDD[(String, (Array[String],
 Option[Array[String]]))]

 I want to write it as a csv file.

 Please suggest how this can be done.

 myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , +
 line._2._2.mkString(','))).saveAsTextFile(hdfs://...)

 Doing mkString on line._2._1 works but does not work for the Option type.

 Please suggest how this can be done.


 Thanks
 Kundan







advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-03 Thread Michael Albert
Greetings!
First, my sincere thanks to all who have given me advice.Following previous 
discussion, I've rearranged my code to try to keep the partitions to more 
manageable sizes.Thanks to all who commented.
At the moment, the input set I'm trying to work with is about 90GB (avro 
parquet format).
When I run on a reasonable chunk of the data (say half) things work reasonably.
On the full data, the spark process stalls.That is, for about 1.5 hours out of 
a 3.5 hour run, I see no activity.No cpu usage, no error message, no network 
activity.It just seems to sits there.The messages bracketing the stall are 
shown below.
Any advice on how to diagnose this? I don't get any error messages.  The spark 
UI says that it is running a stage, but it makes no discernible 
progress.Ganglia shows no CPU usage or network activity.When I shell into the 
worker nodes there are no filled disks or other obvious problems.
How can I discern what Spark is waiting for?
The only weird thing seen, other than the stall, is that the yarn logs on the 
workers have lines with messages like this:2015-02-03 22:59:58,890 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 13158 for container-id 
container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 
7.6 GB of 42.5 GB virtual memory used
It's rather strange that it mentions 42.5 GB of virtual memory.  The machines 
are EMR machines with 32 GB of physical memory and, as far as I can determine, 
no swap space.
The messages bracketing the stall are shown below.

Any advice is welcome.
Thanks!
Sincerely, Mike Albert
Before the stall.15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: 
Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 
21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at 
Transposer.scala:147) finished in 4880.317 s15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: looking for newly runnable stages15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: running: Set(Stage 3)15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8)15/02/03 
21:45:28 INFO scheduler.DAGScheduler: failed: Set()15/02/03 21:45:28 INFO 
scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3)15/02/03 
21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 
6)15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: 
List(Stage 7)At this point, I see no activity for 1.5 hours except for this 
(XXX for I.P. address)15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to 
ExecutorActor: 
akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor
Then finally it started again:15/02/03 23:31:34 INFO scheduler.TaskSetManager: 
Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on 
ip-10-171-0-124.ec2.internal (3/4)15/02/03 23:31:34 INFO 
scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7300) in 7208503 
ms on ip-10-171-0-128.ec2.internal (4/4)15/02/03 23:31:34 INFO 
scheduler.DAGScheduler: Stage 3 (mapPartitions at Transposer.scala:211) 
finished in 7209.534 s




Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Sven Krasser
Hey Joe,

With the ephemeral HDFS, you get the instance store of your worker nodes.
For m3.xlarge that will be two 40 GB SSDs local to each instance, which are
very fast.

For the persistent HDFS, you get whatever EBS volumes the launch script
configured. EBS volumes are always network drives, so the usual limitations
apply. To optimize throughput, you can use EBS volumes with provisioned
IOPS and you can use EBS optimized instances. I don't have hard numbers at
hand, but I'd expect this to be noticeably slower than using local SSDs.

As far as only using S3 goes, it depends on your use case (i.e. what you
plan on doing with the data while it is there). If you store it there in
between running different applications, you can likely work around
consistency issues.

Also, if you use Amazon's EMRFS to access data in S3, you can use their new
consistency feature (
https://aws.amazon.com/blogs/aws/emr-consistent-file-system/).

Hope this helps!
-Sven


On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

 Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

 [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote:

 You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

 I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
http://sites.google.com/site/krasser/?utm_source=sig


Sort based shuffle not working properly?

2015-02-03 Thread nitinkak001
I am trying to implement secondary sort in spark as we do in map-reduce. 

Here is my data(tab separated, without c1, c2, c2). 
c1c2 c3
1   2   4
1   3   6
2   4   7
2   6   8
3   5   5
3   1   8
3   2   0

To do secondary sort, I create paried RDD as 

/((c1 + ,+ c2), row)/

and then use a custom partitioner to partition only on c1. I have set
/spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the
key 3 I am expecting to get 
(3, 1)
(3, 2)
(3, 5)
but still getting the original order
3,5
3,1
3,2

Here is the custom partitioner code:

/class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
  def numPartitions = p
  def getPartition(key: Any) = {
key.asInstanceOf[String].split(,)(0).toInt
  }

}/

and driver code, please tell me what I am doing wrong

/val conf = new SparkConf().setAppName(MapInheritanceExample)
conf.set(spark.shuffle.manager, SORT);
val sc = new SparkContext(conf)
val pF = sc.textFile(inputFile)

val log = LogFactory.getLog(MapFunctionTest)
val partitionedRDD = pF.map { x =

var arr = x.split(\t);
(arr(0)+,+arr(1), null)

}.partitionBy(new StraightPartitioner(10))

var outputRDD = partitionedRDD.mapPartitions(p = {
  p.map({ case(o, n) = {
   o
}
  })
})/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Sort based shuffle not working properly?

2015-02-03 Thread nitinkak001
Just to add, I am suing Spark 1.1.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark (SQL) as OLAP engine

2015-02-03 Thread Denny Lee
A great presentation by Evan Chan on utilizing Cassandra as Jonathan noted
is at: OLAP with Cassandra and Spark
http://www.slideshare.net/EvanChan2/2014-07olapcassspark.

On Tue Feb 03 2015 at 10:03:34 AM Jonathan Haddad j...@jonhaddad.com wrote:

 Write out the rdd to a cassandra table.  The datastax driver provides
 saveToCassandra() for this purpose.

 On Tue Feb 03 2015 at 8:59:15 AM Adamantios Corais 
 adamantios.cor...@gmail.com wrote:

 Hi,

 After some research I have decided that Spark (SQL) would be ideal for
 building an OLAP engine. My goal is to push aggregated data (to Cassandra
 or other low-latency data storage) and then be able to project the results
 on a web page (web service). New data will be added (aggregated) once a
 day, only. On the other hand, the web service must be able to run some
 fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the
 results with D3.js. Note that I can already achieve similar speeds while in
 REPL mode by caching the data. Therefore, I believe that my problem must be
 re-phrased as follows: How can I automatically cache the data once a day
 and make them available on a web service that is capable of running any
 Spark or Spark (SQL)  statement in order to plot the results with D3.js?

 Note that I have already some experience in Spark (+Spark SQL) as well as
 D3.js but not at all with OLAP engines (at least in their traditional form).

 Any ideas or suggestions?


 *// Adamantios*





Re: 2GB limit for partitions?

2015-02-03 Thread Mridul Muralidharan
That is fairly out of date (we used to run some of our jobs on it ... But
that is forked off 1.1 actually).

Regards
Mridul

On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote:

 Thanks for the explanations, makes sense.  For the record looks like this
 was worked on a while back (and maybe the work is even close to a
 solution?)

 https://issues.apache.org/jira/browse/SPARK-1476

 and perhaps an independent solution was worked on here?

 https://issues.apache.org/jira/browse/SPARK-1391


 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com
 javascript:; wrote:

  cc dev list
 
 
  How are you saving the data? There are two relevant 2GB limits:
 
  1. Caching
 
  2. Shuffle
 
 
  For caching, a partition is turned into a single block.
 
  For shuffle, each map partition is partitioned into R blocks, where R =
  number of reduce tasks. It is unlikely a shuffle block  2G, although it
  can still happen.
 
  I think the 2nd problem is easier to fix than the 1st, because we can
  handle that in the network transport layer. It'd require us to divide the
  transfer of a very large block into multiple smaller blocks.
 
 
 
  On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com
 javascript:; wrote:
 
  Michael,
 
  you are right, there is definitely some limit at 2GB.  Here is a trivial
  example to demonstrate it:
 
  import org.apache.spark.storage.StorageLevel
  val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
  Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
  d.count()
 
  It gives the same error you are observing.  I was under the same
  impression as Sean about the limits only being on blocks, not
 partitions --
  but clearly that isn't the case here.
 
  I don't know the whole story yet, but I just wanted to at least let you
  know you aren't crazy :)
  At the very least this suggests that you might need to make smaller
  partitions for now.
 
  Imran
 
 
  On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
  m_albert...@yahoo.com.invalid wrote:
 
  Greetings!
 
  Thanks for the response.
 
  Below is an example of the exception I saw.
  I'd rather not post code at the moment, so I realize it is completely
  unreasonable to ask for a diagnosis.
  However, I will say that adding a partitionBy() was the last change
  before this error was created.
 
 
  Thanks for your time and any thoughts you might have.
 
  Sincerely,
   Mike
 
 
 
  Exception in thread main org.apache.spark.SparkException: Job aborted
  due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
  failure: Lost task 4.3 in stage 5.0 (TID 6012,
  ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 
 
--
   *From:* Sean Owen so...@cloudera.com javascript:;
  *To:* Michael Albert m_albert...@yahoo.com javascript:;
  *Cc:* user@spark.apache.org javascript:; user@spark.apache.org
 javascript:;
  *Sent:* Monday, February 2, 2015 10:13 PM
  *Subject:* Re: 2GB limit for partitions?
 
  The limit is on blocks, not partitions. Partitions have many blocks.
 
  It sounds like you are creating very large values in memory, but I'm
  not sure given your description. You will run into problems if a
  single object is more than 2GB, of course. More of the stack trace
  might show what is mapping that much memory.
 
  If you simply want data into 1000 files it's a lot simpler. Just
  repartition into 1000 partitions and save the data. If you need more
  control over what goes into which partition, use a Partitioner, yes.
 
 
 
  On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
  m_albert...@yahoo.com.invalid wrote:
   Greetings!
  
   SPARK-1476 says that there is a 2G limit for blocks.
   Is