Re: “mapreduce.job.user.classpath.first” for Spark
Try spark.yarn.user.classpath.first (see https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN). Also thread at http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html. HTH, Markus On 02/03/2015 11:20 PM, Corey Nolet wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors?
Re: “mapreduce.job.user.classpath.first” for Spark
Corey, Which version of Spark do you use? I am using Spark 1.2.0, and guava 15.0. It seems fine. Best, Bo On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid wrote: Try spark.yarn.user.classpath.first (see https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN). Also thread at http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html . HTH, Markus On 02/03/2015 11:20 PM, Corey Nolet wrote: I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors?
“mapreduce.job.user.classpath.first” for Spark
I'm having a really bad dependency conflict right now with Guava versions between my Spark application in Yarn and (I believe) Hadoop's version. The problem is, my driver has the version of Guava which my application is expecting (15.0) while it appears the Spark executors that are working on my RDDs have a much older version (assuming it's the old version on the Hadoop classpath). Is there a property like mapreduce.job.user.classpath.first' that I can set to make sure my own classpath is extablished first on the executors?
HiveContext in SparkSQL - concurrency issues
Hi, I've been trying to use HiveContext(instead of SQLContext) in my SparkSQL application and when I run the application simultaneously, it only works on the first call and every other call throws the following error- ERROR Datastore.Schema: Failed initialising database. Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@3s405f32, see the next exception for details. org.datanucleus.exceptions.NucleusDataStoreException: Failed to start database 'metastore_db' with class loader sun.misc.Launcher$AppClassLoader@3s405f32, see the next exception for details. at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:516) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:298) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301) at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187) at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:497) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:475) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:523) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:397) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:356) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4944) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at
Multiple running SparkContexts detected in the same JVM!
I have a cluster which running CDH5.1.0 with Spark component. Because the default version of Spark from CDH5.1.0 is 1.0.0 while I want to use some features of Spark 1.2.0, I compiled another Spark with Maven. But when I run into Spark-shell and created a new SparkContext, I met the below error: 15/02/04 14:08:19 WARN SparkContext: Multiple running SparkContexts detected in the same JVM! org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at ... And I tried to delete the default Spark and *set(spark.driver.allowMultipleContexts, true) * option, But It didn't work. How could I fix it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-running-SparkContexts-detected-in-the-same-JVM-tp21492.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StackOverflowError on RDD.union
Use SparkContext#union[T](rdds: Seq[RDD[T]]) On Tue, Feb 3, 2015 at 7:43 PM, Thomas Kwan thomas.k...@manage.com wrote: I am trying to combine multiple RDDs into 1 RDD, and I am using the union function. I wonder if anyone has seen StackOverflowError as follows: Exception in thread main java.lang.StackOverflowError at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120)
Re: connector for CouchDB
Spark Doesn't support it, but this connector is open source, you can get it from github. The difference between these two DB is depending on what type of solution you are looking for. Please refer this link : http://blog.nahurst.com/visual-guide-to-nosql-systems FYI, from the list of NOSQL in above link, there are connectors available for MongoDb and cassendra. For storing JSON I think both system support this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/connector-for-CouchDB-tp18630p21489.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
StackOverflowError on RDD.union
I am trying to combine multiple RDDs into 1 RDD, and I am using the union function. I wonder if anyone has seen StackOverflowError as follows: Exception in thread main java.lang.StackOverflowError at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120)
Re: Spark (SQL) as OLAP engine
Hi Sean, I'm interested in trying something similar. How was your performance when you had many concurrent queries running against spark? I know this will work well where you have a low volume of queries against a large dataset, but am concerned about having a high volume of queries against the same large dataset. (I know I've not defined large, but hopefully you get the gist:)) I'm using Cassandra to handle workloads where we have large amounts of low complexity queries, but want to move to an architecture which supports a similar(ish) large volume of higher complexity queries. I'd like to use spark as the query serving layer, but have concerns about how many concurrent queries it could handle. I'd be interested in anyones thoughts or experience with this. Thanks, Andrew From: Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com Date: Wednesday, February 4, 2015 at 1:01 To: Adamantios Corais adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark (SQL) as OLAP engine We have gone down a similar path at Webtrends, Spark has worked amazingly well for us in this use case. Our solution goes from REST, directly into spark, and back out to the UI instantly. Here is the resulting product in case you are curious (and please pardon the self promotion): https://www.webtrends.com/support-training/training/explore-onboarding/ How can I automatically cache the data once a day... If you are not memory-bounded you could easily cache the daily results for some span of time and re-union them together each time you add new data. You would service queries off the unioned RDD. ... and make them available on a web service From the unioned RDD you could always step into spark SQL at that point. Or you could use a simple scatter/gather pattern for this. As with all things Spark, this is super easy to do: just use aggregate()()! Cheers, Sean On Feb 3, 2015, at 9:59 AM, Adamantios Corais adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com wrote: Hi, After some research I have decided that Spark (SQL) would be ideal for building an OLAP engine. My goal is to push aggregated data (to Cassandra or other low-latency data storage) and then be able to project the results on a web page (web service). New data will be added (aggregated) once a day, only. On the other hand, the web service must be able to run some fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I can already achieve similar speeds while in REPL mode by caching the data. Therefore, I believe that my problem must be re-phrased as follows: How can I automatically cache the data once a day and make them available on a web service that is capable of running any Spark or Spark (SQL) statement in order to plot the results with D3.js? Note that I have already some experience in Spark (+Spark SQL) as well as D3.js but not at all with OLAP engines (at least in their traditional form). Any ideas or suggestions? // Adamantios This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Fail to launch spark-shell on windows 2008 R2
Hi Ningjun, I have been working with Spark 1.2 on Windows 7 and Windows 2008 R2 (purely for development purposes). I had most recently installed them utilizing Java 1.8, Scala 2.10.4, and Spark 1.2 Precompiled for Hadoop 2.4+. A handy thread concerning the null\bin\winutils issue is addressed in an earlier thread at: http://apache-spark-user-list.1001560.n3.nabble.com/Run-spark-unit-test-on-Windows-7-td8656.html Hope this helps a little bit! Denny On Tue Feb 03 2015 at 8:24:24 AM Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Hi Gen Thanks for your feedback. We do have a business reason to run spark on windows. We have an existing application that is built on C# .NET running on windows. We are considering adding spark to the application for parallel processing of large data. We want spark to run on windows so it integrate with our existing app easily. Has anybody use spark on windows for production system? Is spark reliable on windows? Ningjun *From:* gen tang [mailto:gen.tan...@gmail.com] *Sent:* Thursday, January 29, 2015 12:53 PM *To:* Wang, Ningjun (LNG-NPV) *Cc:* user@spark.apache.org *Subject:* Re: Fail to launch spark-shell on windows 2008 R2 Hi, Using spark under windows is a really bad idea, because even you solve the problems about hadoop, you probably will meet the problem of java.net.SocketException. connection reset by peer. It is caused by the fact we ask socket port too frequently under windows. In my knowledge, it is really difficult to solve. And you will find something really funny: the same code sometimes works and sometimes not, even in the shell mode. And I am sorry but I don't see the interest to run spark under windows and moreover using local file system in a business environment. Do you have a cluster in windows? FYI, I have used spark prebuilt on hadoop 1 under windows 7 and there is no problem to launch, but have problem of java.net.SocketException. If you are using spark prebuilt on hadoop 2, you should consider follow the solution provided by https://issues.apache.org/jira/browse/SPARK-2356 Cheers Gen On Thu, Jan 29, 2015 at 5:54 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Install virtual box which run Linux? That does not help us. We have business reason to run it on Windows operating system, e.g. Windows 2008 R2. If anybody have done that, please give some advise on what version of spark, which version of Hadoop do you built spark against, etc…. Note that we only use local file system and do not have any hdfs file system at all. I don’t understand why spark generate so many error on Hadoop while we don’t even need hdfs. Ningjun *From:* gen tang [mailto:gen.tan...@gmail.com] *Sent:* Thursday, January 29, 2015 10:45 AM *To:* Wang, Ningjun (LNG-NPV) *Cc:* user@spark.apache.org *Subject:* Re: Fail to launch spark-shell on windows 2008 R2 Hi, I tried to use spark under windows once. However the only solution that I found is to install virtualbox Hope this can help you. Best Gen On Thu, Jan 29, 2015 at 4:18 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I deployed spark-1.1.0 on Windows 7 and was albe to launch the spark-shell. I then deploy it to windows 2008 R2 and launch the spark-shell, I got the error java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOExceptio n: Cannot run program ls: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:200) at org.apache.hadoop.util.Shell.run(Shell.java:182) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:375) at org.apache.hadoop.util.Shell.execCommand(Shell.java:461) at org.apache.hadoop.util.Shell.execCommand(Shell.java:444) at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:710) at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFil eSystem.java:443) at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSyst em.java:418) Here is the detail output C:\spark-1.1.0\bin spark-shell 15/01/29 10:13:13 INFO SecurityManager: Changing view acls to: ningjun.wang, 15/01/29 10:13:13 INFO SecurityManager: Changing modify acls to: ningjun.wang, 15/01/29 10:13:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ningjun.wang, ); users with modify permissions: Set(ningjun.wang, ) 15/01/29 10:13:13 INFO HttpServer: Starting HTTP Server 15/01/29 10:13:14 INFO Server: jetty-8.y.z-SNAPSHOT 15/01/29 10:13:14 INFO AbstractConnector: Started SocketConnector@0.0.0.0:53692 15/01/29 10:13:14 INFO Utils: Successfully
Spark SQL taking long time to print records from a table
I have 3 text files in hdfs which I am reading using spark sql and registering them as table. After that I am doing almost 5-6 operations - including joins , group by etc.. And this whole process is taking hardly 6-7 secs. ( Source File size - 3 GB with almost 20 million rows ). As a final step of my computation, I am expecting only 1 record in my final rdd - named as acctNPIScr in below code snippet. My question here is that when I am trying to print this rdd either by registering as table and printing records from table or by this method - acctNPIScr.map(t = Score: + t(1)).collect().foreach(println). It is taking very long time - almost 1.5 minute to print 1 record. Can someone pls help me if I am doing something wrong in printing. What is the best way to print final result from schemardd. . val acctNPIScr = sqlContext.sql(SELECT party_id, sum(npi_int)/sum(device_priority_new) as npi_score FROM AcctNPIScoreTemp group by party_id ) acctNPIScr.registerTempTable(AcctNPIScore) val endtime = System.currentTimeMillis() logger.info(Total sql Time : + (endtime - st)) // this time is hardly 5 secs println(start printing) val result = sqlContext.sql(SELECT * FROM AcctNPIScore).collect().foreach(println) //acctNPIScr.map(t = Score: + t(1)).collect().foreach(println) logger.info(Total printing Time : + (System.currentTimeMillis() - endtime)) // print one record is taking almost 1.5 minute -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-taking-long-time-to-print-records-from-a-table-tp21493.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to define a file filter for file name patterns in Apache Spark Streaming in Java?
Hello Akhil, Thank you for taking your time for a detailed answer. I managed to solve it in a very similar manner. Kind regards, Emre Sevinç On Mon, Feb 2, 2015 at 8:22 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Emre, This is how you do that in scala: val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](/home/akhld/sigmoid, (t: Path) = true, true) In java you can do something like: jssc.ssc().LongWritable, Text, SequenceFileInputFormatfileStream(/home/akhld/sigmoid, new AbstractFunction1Path, Object() { @Override public Boolean apply(Path input) { //file filtering logic here return true; } }, true, ClassTag$.MODULE$.apply(LongWritable.class), ClassTag$.MODULE$.apply(Text.class), ClassTag$.MODULE$.apply(SequenceFileInputFormat.class)); Thanks Best Regards On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter for file names when creating an InputDStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html by invoking the fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method. My code is working perfectly fine when I don't use a file filter, e.g. by invoking the other fileStream https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 method (described here https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29: https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 ). According to the documentation of *fileStream* method, I can pass it scala.Function1org.apache.hadoop.fs.Path,Object filter But so far, I could not create a fileFilter. My initial attempts have been 1- Tried to implement it as: Function1Path, Object fileFilter = new Function1Path, Object() { @Override public Object apply(Path v1) { return true; } @Override public A Function1A, Object compose(Function1A, Path g) { return Function1$class.compose(this, g); } @Override public A Function1Path, A andThen(Function1Object, A g) { return Function1$class.andThen(this, g); } }; But apparently my implementation of andThen is wrong, and I couldn't understand how I should implement it. It complains that the anonymous function: is not abstract and does not override abstract method AandThen$mcVJ$sp(scala.Function1scala.runtime.BoxedUnit,A) in scala.Function1 2- Tried to implement it as: Function1Path, Object fileFilter = new AbstractFunction1Path, Object() { @Override public Object apply(Path v1) { return true; } }; This one compiles, but when I run it I get an exception: 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 java.io.NotSerializableException: myModule$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Spark Master Build Failing to run on cluster in standalone ClassNotFoundException: javax.servlet.FilterRegistration
Already come up several times today: https://issues.apache.org/jira/browse/SPARK-5557 On Tue, Feb 3, 2015 at 8:04 AM, Night Wolf nightwolf...@gmail.com wrote: Hi, I just built Spark 1.3 master using maven via make-distribution.sh; ./make-distribution.sh --name mapr3 --skip-java-test --tgz -Pmapr3 -Phive -Phive-thriftserver -Phive-0.12.0 When trying to start the standalone spark master on a cluster I get the following stack trace; 15/02/04 08:53:56 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/02/04 08:53:56 INFO Remoting: Starting remoting 15/02/04 08:53:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@hadoop-009:7077] 15/02/04 08:53:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkMaster@hadoop-009:7077] ...skipping... at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136) at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129) at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:96) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:87) at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67) at org.apache.spark.deploy.master.ui.MasterWebUI.initialize(MasterWebUI.scala:40) at org.apache.spark.deploy.master.ui.MasterWebUI.init(MasterWebUI.scala:36) at org.apache.spark.deploy.master.Master.init(Master.scala:95) ... 18 more Caused by: java.lang.ClassNotFoundException: javax.servlet.FilterRegistration at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 27 more The distro seems about the right size (260MB, so I dont imagine any of the libraries are missing. The above command worked on 1.2... Any ideas whats going wrong? Cheers, N - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is LogisticRegressionWithSGD in MLlib scalable?
Hi Everyone, Is LogisticRegressionWithSGD in MLlib scalable? If so, what is the idea behind the scalable implementation? Thanks in advance, Peng - Peng Zhang -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-LogisticRegressionWithSGD-in-MLlib-scalable-tp21482.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to run spark-shell after build
Hi all, I'm trying to run the master version of spark in order to test some alpha components in ml package. I follow the build spark documentation and build it with : $ mvn clean package The build is successful but when I try to run spark-shell I got the following errror : *Exception in thread main java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse* *at org.apache.spark.HttpServer.org http://org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:74)* *at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)* *at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61)* *at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1732)* *at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)* *at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1723)* *at org.apache.spark.HttpServer.start(HttpServer.scala:61)* *at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130)* *at org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185)* *at org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214)* *at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946)* *at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)* *at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942)* *at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)* *at org.apache.spark.repl.SparkILoop.org http://org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942)* *at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039)* *at org.apache.spark.repl.Main$.main(Main.scala:31)* *at org.apache.spark.repl.Main.main(Main.scala)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* *at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403)* *at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77)* *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)* *Caused by: java.lang.ClassNotFoundException: javax.servlet.http.HttpServletResponse* *at java.net.URLClassLoader$1.run(URLClassLoader.java:366)* *at java.net.URLClassLoader$1.run(URLClassLoader.java:355)* *at java.security.AccessController.doPrivileged(Native Method)* *at java.net.URLClassLoader.findClass(URLClassLoader.java:354)* *at java.lang.ClassLoader.loadClass(ClassLoader.java:425)* *at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)* *at java.lang.ClassLoader.loadClass(ClassLoader.java:358)* *... 25 more* What's going wrong ? Jao
Pig loader in Spark
Hi, Anyone has implemented the default Pig Loader in Spark? (loading delimited text files with .pig_schema) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
connecting spark with ActiveMQ
Hi All, I have a requirement where I need to consume messages from ActiveMQ and do live stream processing as well as batch processing using Spark. Is there a spark-plugin or library that can enable this? If not, then do you know any other way this could be done? Regards Mohit
RE: Sort based shuffle not working properly?
Nitin, Suing Spark is not going to help. Perhaps you should sue someone else :-) Just kidding! Mohammed -Original Message- From: nitinkak001 [mailto:nitinkak...@gmail.com] Sent: Tuesday, February 3, 2015 1:57 PM To: user@spark.apache.org Subject: Re: Sort based shuffle not working properly? Just to add, I am suing Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Fail to launch spark-shell on windows 2008 R2
Hi Gen Thanks for your feedback. We do have a business reason to run spark on windows. We have an existing application that is built on C# .NET running on windows. We are considering adding spark to the application for parallel processing of large data. We want spark to run on windows so it integrate with our existing app easily. Has anybody use spark on windows for production system? Is spark reliable on windows? Ningjun From: gen tang [mailto:gen.tan...@gmail.com] Sent: Thursday, January 29, 2015 12:53 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: Fail to launch spark-shell on windows 2008 R2 Hi, Using spark under windows is a really bad idea, because even you solve the problems about hadoop, you probably will meet the problem of java.net.SocketException. connection reset by peer. It is caused by the fact we ask socket port too frequently under windows. In my knowledge, it is really difficult to solve. And you will find something really funny: the same code sometimes works and sometimes not, even in the shell mode. And I am sorry but I don't see the interest to run spark under windows and moreover using local file system in a business environment. Do you have a cluster in windows? FYI, I have used spark prebuilt on hadoop 1 under windows 7 and there is no problem to launch, but have problem of java.net.SocketException. If you are using spark prebuilt on hadoop 2, you should consider follow the solution provided by https://issues.apache.org/jira/browse/SPARK-2356 Cheers Gen On Thu, Jan 29, 2015 at 5:54 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: Install virtual box which run Linux? That does not help us. We have business reason to run it on Windows operating system, e.g. Windows 2008 R2. If anybody have done that, please give some advise on what version of spark, which version of Hadoop do you built spark against, etc…. Note that we only use local file system and do not have any hdfs file system at all. I don’t understand why spark generate so many error on Hadoop while we don’t even need hdfs. Ningjun From: gen tang [mailto:gen.tan...@gmail.commailto:gen.tan...@gmail.com] Sent: Thursday, January 29, 2015 10:45 AM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Fail to launch spark-shell on windows 2008 R2 Hi, I tried to use spark under windows once. However the only solution that I found is to install virtualbox Hope this can help you. Best Gen On Thu, Jan 29, 2015 at 4:18 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.commailto:ningjun.w...@lexisnexis.com wrote: I deployed spark-1.1.0 on Windows 7 and was albe to launch the spark-shell. I then deploy it to windows 2008 R2 and launch the spark-shell, I got the error java.lang.RuntimeException: Error while running command to get file permissions : java.io.IOExceptio n: Cannot run program ls: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:200) at org.apache.hadoop.util.Shell.run(Shell.java:182) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:375) at org.apache.hadoop.util.Shell.execCommand(Shell.java:461) at org.apache.hadoop.util.Shell.execCommand(Shell.java:444) at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:710) at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.loadPermissionInfo(RawLocalFil eSystem.java:443) at org.apache.hadoop.fs.RawLocalFileSystem$RawLocalFileStatus.getPermission(RawLocalFileSyst em.java:418) Here is the detail output C:\spark-1.1.0\bin spark-shell 15/01/29 10:13:13 INFO SecurityManager: Changing view acls to: ningjun.wang, 15/01/29 10:13:13 INFO SecurityManager: Changing modify acls to: ningjun.wang, 15/01/29 10:13:13 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ningjun.wang, ); users with modify permissions: Set(ningjun.wang, ) 15/01/29 10:13:13 INFO HttpServer: Starting HTTP Server 15/01/29 10:13:14 INFO Server: jetty-8.y.z-SNAPSHOT 15/01/29 10:13:14 INFO AbstractConnector: Started SocketConnector@0.0.0.0:53692http://SocketConnector@0.0.0.0:53692 15/01/29 10:13:14 INFO Utils: Successfully started service 'HTTP class server' on port 53692. Failed to created SparkJLineReader: java.lang.NoClassDefFoundError: Could not initialize class org.f usesource.jansi.internal.Kernel32 Falling back to SimpleReader. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Type :help for more information. 15/01/29 10:13:18 INFO
Re: ephemeral-hdfs vs persistent-hdfs - performance
You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark (SQL) as OLAP engine
Hi, After some research I have decided that Spark (SQL) would be ideal for building an OLAP engine. My goal is to push aggregated data (to Cassandra or other low-latency data storage) and then be able to project the results on a web page (web service). New data will be added (aggregated) once a day, only. On the other hand, the web service must be able to run some fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I can already achieve similar speeds while in REPL mode by caching the data. Therefore, I believe that my problem must be re-phrased as follows: How can I automatically cache the data once a day and make them available on a web service that is capable of running any Spark or Spark (SQL) statement in order to plot the results with D3.js? Note that I have already some experience in Spark (+Spark SQL) as well as D3.js but not at all with OLAP engines (at least in their traditional form). Any ideas or suggestions? *// Adamantios*
Re: Spark on Yarn: java.lang.IllegalArgumentException: Invalid rule
The version I'm using was already pre-built for Hadoop 2.3. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-java-lang-IllegalArgumentException-Invalid-rule-tp21382p21485.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ephemeral-hdfs vs persistent-hdfs - performance
I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe
Supported Notebooks (and other viz tools) for Spark 0.9.1?
Hi, I am using Spark 0.9.1 and I am looking for a proper viz tools that supports that specific version. As far as I have seen all relevant tools (e.g. spark-notebook, zeppelin-project etc) only support 1.1 or 1.2; no mentions about older versions of Spark. Any ideas or suggestions? *// Adamantios*
Re: Writing RDD to a csv file
this is more of a scala question, so probably next time you'd like to address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala val optArrStr:Option[Array[String]] = ??? optArrStr.map(arr = arr.mkString(,)).getOrElse() // empty string or whatever default value you have for this. kr, Gerard. On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com wrote: I have a RDD which is of type org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] I want to write it as a csv file. Please suggest how this can be done. myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , + line._2._2.mkString(','))).saveAsTextFile(hdfs://...) Doing mkString on line._2._1 works but does not work for the Option type. Please suggest how this can be done. Thanks Kundan
Writing RDD to a csv file
I have a RDD which is of type org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] I want to write it as a csv file. Please suggest how this can be done. myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , + line._2._2.mkString(','))).saveAsTextFile(hdfs://...) Doing mkString on line._2._1 works but does not work for the Option type. Please suggest how this can be done. Thanks Kundan
Re: Writing RDD to a csv file
Thanks Gerard !! This is working. On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas gerard.m...@gmail.com wrote: this is more of a scala question, so probably next time you'd like to address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala val optArrStr:Option[Array[String]] = ??? optArrStr.map(arr = arr.mkString(,)).getOrElse() // empty string or whatever default value you have for this. kr, Gerard. On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com wrote: I have a RDD which is of type org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] I want to write it as a csv file. Please suggest how this can be done. myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , + line._2._2.mkString(','))).saveAsTextFile(hdfs://...) Doing mkString on line._2._1 works but does not work for the Option type. Please suggest how this can be done. Thanks Kundan
Re: Supported Notebooks (and other viz tools) for Spark 0.9.1?
Hello Adamantios, Thanks for the poke and the interest. Actually, you're the second asking about backporting it. Yesterday (late), I created a branch for it... and the simple local spark test worked! \o/. However, it'll be the 'old' UI :-/. Since I didn't ported the code using play 2.2.6 to the new ui. FYI: play 2.2.6 uses a compliant akka version, that's why I mention it. It was too late for a push :-D, so I'll commit and push this evening. At least, you can try it tomorrow. I shall be on gitter this evening as well if there are questions: https://gitter.im/andypetrella/spark-notebook Cheers, andy On Tue Feb 03 2015 at 2:05:35 PM Adamantios Corais adamantios.cor...@gmail.com wrote: Hi, I am using Spark 0.9.1 and I am looking for a proper viz tools that supports that specific version. As far as I have seen all relevant tools (e.g. spark-notebook, zeppelin-project etc) only support 1.1 or 1.2; no mentions about older versions of Spark. Any ideas or suggestions? *// Adamantios*
Spark Master Build Failing to run on cluster in standalone ClassNotFoundException: javax.servlet.FilterRegistration
Hi, I just built Spark 1.3 master using maven via make-distribution.sh; ./make-distribution.sh --name mapr3 --skip-java-test --tgz -Pmapr3 -Phive -Phive-thriftserver -Phive-0.12.0 When trying to start the standalone spark master on a cluster I get the following stack trace; 15/02/04 08:53:56 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/02/04 08:53:56 INFO Remoting: Starting remoting 15/02/04 08:53:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkMaster@hadoop-009:7077] 15/02/04 08:53:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkMaster@hadoop-009:7077] ...skipping... at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.util.Reflect$.instantiate(Reflect.scala:66) at akka.actor.ArgsReflectConstructor.produce(Props.scala:352) at akka.actor.Props.newActor(Props.scala:252) at akka.actor.ActorCell.newActor(ActorCell.scala:552) at akka.actor.ActorCell.create(ActorCell.scala:578) ... 9 more Caused by: java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:136) at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:129) at org.spark-project.jetty.servlet.ServletContextHandler.init(ServletContextHandler.java:98) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:96) at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:87) at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:67) at org.apache.spark.deploy.master.ui.MasterWebUI.initialize(MasterWebUI.scala:40) at org.apache.spark.deploy.master.ui.MasterWebUI.init(MasterWebUI.scala:36) at org.apache.spark.deploy.master.Master.init(Master.scala:95) ... 18 more Caused by: java.lang.ClassNotFoundException: javax.servlet.FilterRegistration at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 27 more The distro seems about the right size (260MB, so I dont imagine any of the libraries are missing. The above command worked on 1.2... Any ideas whats going wrong? Cheers, N
Re: Unable to run spark-shell after build
Yes, I see this too. I think the Jetty shading still needs a tweak. It's not finding the servlet API classes. Let's converge on SPARK-5557 to discuss. On Tue, Feb 3, 2015 at 2:04 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm trying to run the master version of spark in order to test some alpha components in ml package. I follow the build spark documentation and build it with : $ mvn clean package The build is successful but when I try to run spark-shell I got the following errror : Exception in thread main java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse at org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:74) at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61) at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:61) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1732) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1723) at org.apache.spark.HttpServer.start(HttpServer.scala:61) at org.apache.spark.repl.SparkIMain.init(SparkIMain.scala:130) at org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.init(SparkILoop.scala:185) at org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:214) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:946) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:942) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:942) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1039) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:403) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:77) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: javax.servlet.http.HttpServletResponse at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 25 more What's going wrong ? Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX: ShortestPaths does not terminate on a grid graph
I think this is a separate issue with how the EdgeRDDImpl partitions edges. If you can merge this change in and rebuild, it should work: https://github.com/apache/spark/pull/4136/files If you can't, I just called the Graph.partitonBy() method right after construction my graph but before performing any operations on it. That way, the EdgeRDDImpl class doesn't have to use the default partitioner. Hope this helps! Jay On Tue Feb 03 2015 at 12:35:14 AM NicolasC nicolas.ch...@inria.fr wrote: On 01/29/2015 08:31 PM, Ankur Dave wrote: Thanks for the reminder. I just created a PR: https://github.com/apache/spark/pull/4273 Ankur Hello, Thanks for the patch. I applied it on Pregel.scala (in Spark 1.2.0 sources) and rebuilt Spark. During execution, at the 25th iteration of Pregel, checkpointing is done and then it throws the following exception : Exception in thread main org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[521] at reduce at VertexRDDImpl.scala:80(0) has different number of partitions than original RDD VertexRDD ZippedPartitionsRDD2[518] at zipPartitions at VertexRDDImpl.scala:170(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint( RDDCheckpointData.scala:98) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1279) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply( RDD.scala:1281) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply( RDD.scala:1281) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1281) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1285) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1351) at org.apache.spark.rdd.RDD.reduce(RDD.scala:867) at org.apache.spark.graphx.impl.VertexRDDImpl.count( VertexRDDImpl.scala:80) at org.apache.spark.graphx.Pregel$.apply(Pregel.scala:155) at org.apache.spark.graphx.lib.ShortestPaths$.run( ShortestPaths.scala:69) Pregel.scala:155 is the following line in the pregel loop: activeMessages = messages.count() - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ephemeral-hdfs vs persistent-hdfs - performance
The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ephemeral-hdfs vs persistent-hdfs - performance
We use S3 as a main storage for all our input data and our generated (output) data. (10's of terabytes of data daily.) We read gzipped data directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long as you parallelize the work well by distributing the processing across enough machines. (About 100 nodes, in our case.) The way we generally operate is re: storage is: read input directly from s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete distcp the relevant output from HDFS back to S3. Works for us ... YMMV. :-) HTH, DR On 02/03/2015 12:32 PM, Joe Wass wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ephemeral-hdfs vs persistent-hdfs - performance
Using s3a protocol (introduced in hadoop 2.6.0) would be faster compared to s3. The upcoming hadoop 2.7.0 contains some bug fixes for s3a. FYI On Tue, Feb 3, 2015 at 9:48 AM, David Rosenstrauch dar...@darose.net wrote: We use S3 as a main storage for all our input data and our generated (output) data. (10's of terabytes of data daily.) We read gzipped data directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long as you parallelize the work well by distributing the processing across enough machines. (About 100 nodes, in our case.) The way we generally operate is re: storage is: read input directly from s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete distcp the relevant output from HDFS back to S3. Works for us ... YMMV. :-) HTH, DR On 02/03/2015 12:32 PM, Joe Wass wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error in saving schemaRDD with Decimal as Parquet
Hi, Any thoughts ? Thanks, On Sun, Feb 1, 2015 at 12:26 PM, Manoj Samel manojsamelt...@gmail.com wrote: Spark 1.2 SchemaRDD has schema with decimal columns created like x1 = new StructField(a, DecimalType(14,4), true) x2 = new StructField(b, DecimalType(14,4), true) Registering as SQL Temp table and doing SQL queries on these columns , including SUM etc. works fine, so the schema Decimal does not seems to be issue When doing saveAsParquetFile on the RDD, it gives following error. Not sure why the DecimalType in SchemaRDD is not seen by Parquet, which seems to see it as scala.math.BigDecimal java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType( ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write( ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write( InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1( ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply( ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
Kyro serialization and OOM
I have about 500 MB of data and I'm trying to process it on a single `local` instance. I'm getting an Out of Memory exception. Stack trace at the end. Spark 1.1.1 My JVM has --Xmx2g spark.driver.memory = 1000M spark.executor.memory = 1000M spark.kryoserializer.buffer.mb = 256 spark.kryoserializer.buffer.max.mb = 256 The objects I'm dealing with are well constrained. Each can be no more than 500 bytes at the very most. I ran into problems with the kryo buffer being too small but I think that 256 MB should do the job. The docs say This must be larger than any object you attempt to serialize. No danger of that. My input is a single file (on average each line is 500 bytes). I'm performing various filter, map, flatMap, groupByKey and reduceByKey. The only 'actions' I'm performing are foreach, which inserts values into a database. On input, I'm parsing the lines and then persisting with DISK_ONLY. I'm foreaching over the keys and then foreaching over the values of key value RDDs. The docs say that groupByKey returns (K, IterableV). So the values (which can be large) shouldn't be serialized as a single list. So I don't think I should be loading anything larger than 256 MB at once. My code works for small sample toy data and I'm trying it out on a bit more. As I understand it, the way that Spark partitions data means that it (in most cases) any job that will run on a cluster will also run on a single instance, given enough time. I think I've given enough memory to cover my serialization needs as I understand them. Have I misunderstood? Joe Stack trace: INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 30.0 (TID 116, localhost, PROCESS_LOCAL, 993 bytes) INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 30.0 (TID 116) ... ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 30.0 (TID 116) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Output.init(Output.java:35) at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58) at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151) at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ... WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 30.0 (TID 116, localhost): java.lang.OutOfMemoryError: Java heap space com.esotericsoftware.kryo.io.Output.init(Output.java:35) org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58) org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151) org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Re: Spark (SQL) as OLAP engine
We have gone down a similar path at Webtrends, Spark has worked amazingly well for us in this use case. Our solution goes from REST, directly into spark, and back out to the UI instantly. Here is the resulting product in case you are curious (and please pardon the self promotion): https://www.webtrends.com/support-training/training/explore-onboarding/ How can I automatically cache the data once a day... If you are not memory-bounded you could easily cache the daily results for some span of time and re-union them together each time you add new data. You would service queries off the unioned RDD. ... and make them available on a web service From the unioned RDD you could always step into spark SQL at that point. Or you could use a simple scatter/gather pattern for this. As with all things Spark, this is super easy to do: just use aggregate()()! Cheers, Sean On Feb 3, 2015, at 9:59 AM, Adamantios Corais adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com wrote: Hi, After some research I have decided that Spark (SQL) would be ideal for building an OLAP engine. My goal is to push aggregated data (to Cassandra or other low-latency data storage) and then be able to project the results on a web page (web service). New data will be added (aggregated) once a day, only. On the other hand, the web service must be able to run some fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I can already achieve similar speeds while in REPL mode by caching the data. Therefore, I believe that my problem must be re-phrased as follows: How can I automatically cache the data once a day and make them available on a web service that is capable of running any Spark or Spark (SQL) statement in order to plot the results with D3.js? Note that I have already some experience in Spark (+Spark SQL) as well as D3.js but not at all with OLAP engines (at least in their traditional form). Any ideas or suggestions? // Adamantios
Re: ephemeral-hdfs vs persistent-hdfs - performance
Thanks very much, that's good to know, I'll certainly give it a look. Can you give me a hint about you unzip your input files on the fly? I thought that it wasn't possible to parallelize zipped inputs unless they were unzipped before passing to Spark? Joe On 3 February 2015 at 17:48, David Rosenstrauch dar...@darose.net wrote: We use S3 as a main storage for all our input data and our generated (output) data. (10's of terabytes of data daily.) We read gzipped data directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long as you parallelize the work well by distributing the processing across enough machines. (About 100 nodes, in our case.) The way we generally operate is re: storage is: read input directly from s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete distcp the relevant output from HDFS back to S3. Works for us ... YMMV. :-) HTH, DR On 02/03/2015 12:32 PM, Joe Wass wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX pregel: getting the current iteration number
Hi Folks, I'm new to GraphX and Scala and my sendMsg function needs to index into an input list to my algorithm based on the pregel()() iteration number, but I don't see a way to access that. I see in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala that it's just an index variable i in a while loop, but is there a way for sendMsg to access within the loop's scope? I don't think so, so barring that, given Scala's functional stateless nature, what other approaches would you take to do this? I'm considering a closure, but a var that gets updated by all the sendMsgs seems a recipe for trouble. Thank you, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark (SQL) as OLAP engine
Write out the rdd to a cassandra table. The datastax driver provides saveToCassandra() for this purpose. On Tue Feb 03 2015 at 8:59:15 AM Adamantios Corais adamantios.cor...@gmail.com wrote: Hi, After some research I have decided that Spark (SQL) would be ideal for building an OLAP engine. My goal is to push aggregated data (to Cassandra or other low-latency data storage) and then be able to project the results on a web page (web service). New data will be added (aggregated) once a day, only. On the other hand, the web service must be able to run some fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I can already achieve similar speeds while in REPL mode by caching the data. Therefore, I believe that my problem must be re-phrased as follows: How can I automatically cache the data once a day and make them available on a web service that is capable of running any Spark or Spark (SQL) statement in order to plot the results with D3.js? Note that I have already some experience in Spark (+Spark SQL) as well as D3.js but not at all with OLAP engines (at least in their traditional form). Any ideas or suggestions? *// Adamantios*
Re: Setting maxPrintString in Spark Repl to view SQL query plans
I'll add i usually just do println(query.queryExecution) On Tue, Feb 3, 2015 at 11:34 AM, Michael Armbrust mich...@databricks.com wrote: You should be able to do something like: sbt -Dscala.repl.maxprintstring=64000 hive/console Here's an overview of catalyst: https://docs.google.com/a/databricks.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit#heading=h.vp2tej73rtm2 On Tue, Feb 3, 2015 at 1:37 AM, Mick Davies michael.belldav...@gmail.com wrote: Hi, I want to increase the maxPrintString the Spark repl to look at SQL query plans, as they are truncated by default at 800 chars, but don't know how to set this. You don't seem to be able to do it in the same way as you would with with Scala repl. Anyone know how to set this? Also anyone know a good document describing the interpretation of Spark SQL query plans? Thanks Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-maxPrintString-in-Spark-Repl-to-view-SQL-query-plans-tp21476.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Supported Notebooks (and other viz tools) for Spark 0.9.1?
Adamantios, As said, I backported it to 0.9.x and now it's pushed on this branch: https://github.com/andypetrella/spark-notebook/tree/spark-0.9.x. I didn't created dist atm, because I'd prefer to do it only if necessary :-). So, if you want to try it out, just clone the repo, checked out in this branch and launch `sbt run`. HTH, andy On Tue Feb 03 2015 at 2:45:43 PM andy petrella andy.petre...@gmail.com wrote: Hello Adamantios, Thanks for the poke and the interest. Actually, you're the second asking about backporting it. Yesterday (late), I created a branch for it... and the simple local spark test worked! \o/. However, it'll be the 'old' UI :-/. Since I didn't ported the code using play 2.2.6 to the new ui. FYI: play 2.2.6 uses a compliant akka version, that's why I mention it. It was too late for a push :-D, so I'll commit and push this evening. At least, you can try it tomorrow. I shall be on gitter this evening as well if there are questions: https://gitter.im/andypetrella/spark-notebook Cheers, andy On Tue Feb 03 2015 at 2:05:35 PM Adamantios Corais adamantios.cor...@gmail.com wrote: Hi, I am using Spark 0.9.1 and I am looking for a proper viz tools that supports that specific version. As far as I have seen all relevant tools (e.g. spark-notebook, zeppelin-project etc) only support 1.1 or 1.2; no mentions about older versions of Spark. Any ideas or suggestions? *// Adamantios*
Re: GraphX pregel: getting the current iteration number
I don't think its possible to access. What I've done before is send the current or next iteration index with the message, where the message is a case class. HTH Dan On Tue, Feb 3, 2015 at 10:20 AM, Matthew Cornell corn...@cs.umass.edu wrote: Hi Folks, I'm new to GraphX and Scala and my sendMsg function needs to index into an input list to my algorithm based on the pregel()() iteration number, but I don't see a way to access that. I see in https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala that it's just an index variable i in a while loop, but is there a way for sendMsg to access within the loop's scope? I don't think so, so barring that, given Scala's functional stateless nature, what other approaches would you take to do this? I'm considering a closure, but a var that gets updated by all the sendMsgs seems a recipe for trouble. Thank you, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting maxPrintString in Spark Repl to view SQL query plans
You should be able to do something like: sbt -Dscala.repl.maxprintstring=64000 hive/console Here's an overview of catalyst: https://docs.google.com/a/databricks.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit#heading=h.vp2tej73rtm2 On Tue, Feb 3, 2015 at 1:37 AM, Mick Davies michael.belldav...@gmail.com wrote: Hi, I want to increase the maxPrintString the Spark repl to look at SQL query plans, as they are truncated by default at 800 chars, but don't know how to set this. You don't seem to be able to do it in the same way as you would with with Scala repl. Anyone know how to set this? Also anyone know a good document describing the interpretation of Spark SQL query plans? Thanks Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-maxPrintString-in-Spark-Repl-to-view-SQL-query-plans-tp21476.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ephemeral-hdfs vs persistent-hdfs - performance
Not all of our input files are zipped. The ones that are obviously are not parallelized - they're just processed by a single task. Not a big issue for us, though, as the those zipped files aren't too big. DR On 02/03/2015 01:08 PM, Joe Wass wrote: Thanks very much, that's good to know, I'll certainly give it a look. Can you give me a hint about you unzip your input files on the fly? I thought that it wasn't possible to parallelize zipped inputs unless they were unzipped before passing to Spark? Joe On 3 February 2015 at 17:48, David Rosenstrauch dar...@darose.net wrote: We use S3 as a main storage for all our input data and our generated (output) data. (10's of terabytes of data daily.) We read gzipped data directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long as you parallelize the work well by distributing the processing across enough machines. (About 100 nodes, in our case.) The way we generally operate is re: storage is: read input directly from s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete distcp the relevant output from HDFS back to S3. Works for us ... YMMV. :-) HTH, DR On 02/03/2015 12:32 PM, Joe Wass wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
This is an exerpt from the Design document of the implementation of Sort based shuffle.. I am thinking I might be wrong in my perception of sort based shuffle. Dont completely understand it though. *Motivation* A sortbased shuffle can be more scalable than Spark’s current hashbased one because it doesn’t require writing a separate file for each reduce task from each mapper. Instead, we write a single sorted file and serve ranges of it to different reducers. In jobs with a lot of reduce tasks (say 10,000+), this saves significant memory for compression and serialization buffers and results in more sequential disk I/O. *Implementation* To perform a sortbased shuffle, each map task will produce one or more output files sorted by a key’s partition ID, then mergesort them to yield a single output file. Because it’s only necessary to group the keys together into partitions, we won’t bother to also sort them within each partition On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org Sort-basedshuffledesign.pdf Description: Adobe PDF document - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks must also be smaller than 2GB, but due to their number, this is an atypical scenario. If you do sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1000).count() you should not see this error, as the 5GB initial partition was split into 1000 partitions of 5MB each, during a shuffle. On the other hand, sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1).count() may have the same error as Imran showed for caching, and for the same reason. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use
Re: 2GB limit for partitions?
Thank you! This is very helpful. -Mike From: Aaron Davidson ilike...@gmail.com To: Imran Rashid iras...@cloudera.com Cc: Michael Albert m_albert...@yahoo.com; Sean Owen so...@cloudera.com; user@spark.apache.org user@spark.apache.org Sent: Tuesday, February 3, 2015 6:13 PM Subject: Re: 2GB limit for partitions? To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks must also be smaller than 2GB, but due to their number, this is an atypical scenario. If you do sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1000).count() you should not see this error, as the 5GB initial partition was split into 1000 partitions of 5MB each, during a shuffle. On the other hand, sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1).count() may have the same error as Imran showed for caching, and for the same reason. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :)At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw.I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis.However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 2, 2015 10:13 PM Subject: Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt
Re: 2GB limit for partitions?
Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000
Re: Sort based shuffle not working properly?
I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with
Re: 2GB limit for partitions?
Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands,
Setting maxPrintString in Spark Repl to view SQL query plans
Hi, I want to increase the maxPrintString the Spark repl to look at SQL query plans, as they are truncated by default at 800 chars, but don't know how to set this. You don't seem to be able to do it in the same way as you would with with Scala repl. Anyone know how to set this? Also anyone know a good document describing the interpretation of Spark SQL query plans? Thanks Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-maxPrintString-in-Spark-Repl-to-view-SQL-query-plans-tp21476.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming - tracking/deleting processed files
Hi, To keep processing the older file also you can use fileStream instead of textFileStream. It has a parameter to specify to look for already present files. For deleting the processed files one way is to get the list of all files in the dStream. This can be done by using the foreachRDD api of the dStream received from the fileStream(or textFileStream). Suppose the dStream is JavaDStreamString jpDstream = ssc.textFileStream(path/to/your/folder/); jpDstream.print(); jpDstream.foreachRDD( new FunctionJavaRDDString, Void(){ @Override public Void call(JavaRDDString arg0) throws Exception { getContentHigh(arg0,ssc); return null; } } ); public static U void getContentHigh(JavaRDDString ds, JavaStreamingContext ssc){ int lenPartition = ds.rdd().partitions().length; // this gives the number of files the stream picked for(int i=0;ilenPartition;i++) { UnionPartition upp = (UnionPartition) listPartitions[i]; NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition(); String fPath = npp.serializableHadoopSplit().value().toString(); String[] nT = tmpName.split(:); String name = nT[0]; // name is the path of the file picked for processing. the processing logic can be inside this loop. once //done you can delete the file using the path in the variable name } } Thanks. On Fri, Jan 30, 2015 at 11:37 PM, ganterm [via Apache Spark User List] ml-node+s1001560n21444...@n3.nabble.com wrote: We are running a Spark streaming job that retrieves files from a directory (using textFileStream). One concern we are having is the case where the job is down but files are still being added to the directory. Once the job starts up again, those files are not being picked up (since they are not new or changed while the job is running) but we would like them to be processed. Is there a solution for that? Is there a way to keep track what files have been processed and can we force older files to be picked up? Is there a way to delete the processed files? Thanks! Markus -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tracking-deleting-processed-files-tp21444p21478.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: 2GB limit for partitions?
Greetings! Thanks for the response. Below is an example of the exception I saw.I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis.However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 2, 2015 10:13 PM Subject: Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Shell Timeouts
I am not sure that this way can help you. There is my situation that I can not see any input in terminal after some work gets done via spark-shell, I used to run a command stty echo , and It fixed. Best, Amoners -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Shell-Timeouts-tp21438p21479.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
LeaseExpiredException while writing schemardd to hdfs
I want to write whole schemardd to single in hdfs but facing following exception rg.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /test/data/data1.csv (inode 402042): File does not exist. Holder DFSClient_NONMAPREDUCE_-564238432_57 does not have any open files here is my code rdd.foreachPartition( iterator = { var output = new Path( outputpath ) val fs = FileSystem.get( new Configuration() ) var writer : BufferedWriter = null writer = new BufferedWriter( new OutputStreamWriter( fs.create( output ) ) ) var line = new StringBuilder iterator.foreach( row = { row.foreach( column = { line.append( column.toString + splitter ) } ) writer.write( line.toString.dropRight( 1 ) ) writer.newLine() line.clear } ) writer.close() } ) I think problem is that I am making writer for each partition and multiple writer are executing in parallel so when they try to write to same file then this problem appears. When I avoid this approach then I face task not serializable exception Any suggest to handle this problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LeaseExpiredException-while-writing-schemardd-to-hdfs-tp21477.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Writing RDD to a csv file
In case anyone needs to merge all of their part-n files (small result set only) into a single *.csv file or needs to generically flatten case classes, tuples, etc., into comma separated values: http://deploymentzone.com/2015/01/30/spark-and-merged-csv-files/ On Tue Feb 03 2015 at 8:23:59 AM kundan kumar iitr.kun...@gmail.com wrote: Thanks Gerard !! This is working. On Tue, Feb 3, 2015 at 6:44 PM, Gerard Maas gerard.m...@gmail.com wrote: this is more of a scala question, so probably next time you'd like to address a Scala forum eg. http://stackoverflow.com/questions/tagged/scala val optArrStr:Option[Array[String]] = ??? optArrStr.map(arr = arr.mkString(,)).getOrElse() // empty string or whatever default value you have for this. kr, Gerard. On Tue, Feb 3, 2015 at 2:09 PM, kundan kumar iitr.kun...@gmail.com wrote: I have a RDD which is of type org.apache.spark.rdd.RDD[(String, (Array[String], Option[Array[String]]))] I want to write it as a csv file. Please suggest how this can be done. myrdd.map(line = (line._1 + , + line._2._1.mkString(,) + , + line._2._2.mkString(','))).saveAsTextFile(hdfs://...) Doing mkString on line._2._1 works but does not work for the Option type. Please suggest how this can be done. Thanks Kundan
advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?
Greetings! First, my sincere thanks to all who have given me advice.Following previous discussion, I've rearranged my code to try to keep the partitions to more manageable sizes.Thanks to all who commented. At the moment, the input set I'm trying to work with is about 90GB (avro parquet format). When I run on a reasonable chunk of the data (say half) things work reasonably. On the full data, the spark process stalls.That is, for about 1.5 hours out of a 3.5 hour run, I see no activity.No cpu usage, no error message, no network activity.It just seems to sits there.The messages bracketing the stall are shown below. Any advice on how to diagnose this? I don't get any error messages. The spark UI says that it is running a stage, but it makes no discernible progress.Ganglia shows no CPU usage or network activity.When I shell into the worker nodes there are no filled disks or other obvious problems. How can I discern what Spark is waiting for? The only weird thing seen, other than the stall, is that the yarn logs on the workers have lines with messages like this:2015-02-03 22:59:58,890 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 13158 for container-id container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory used; 7.6 GB of 42.5 GB virtual memory used It's rather strange that it mentions 42.5 GB of virtual memory. The machines are EMR machines with 32 GB of physical memory and, as far as I can determine, no swap space. The messages bracketing the stall are shown below. Any advice is welcome. Thanks! Sincerely, Mike Albert Before the stall.15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable stages15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3)15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage 7, Stage 8)15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set()15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 6: List(Stage 3)15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 7: List(Stage 6)15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage 8: List(Stage 7)At this point, I see no activity for 1.5 hours except for this (XXX for I.P. address)15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor: akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor Then finally it started again:15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4)15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 7300) in 7208503 ms on ip-10-171-0-128.ec2.internal (4/4)15/02/03 23:31:34 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at Transposer.scala:211) finished in 7209.534 s
Re: ephemeral-hdfs vs persistent-hdfs - performance
Hey Joe, With the ephemeral HDFS, you get the instance store of your worker nodes. For m3.xlarge that will be two 40 GB SSDs local to each instance, which are very fast. For the persistent HDFS, you get whatever EBS volumes the launch script configured. EBS volumes are always network drives, so the usual limitations apply. To optimize throughput, you can use EBS volumes with provisioned IOPS and you can use EBS optimized instances. I don't have hard numbers at hand, but I'd expect this to be noticeably slower than using local SSDs. As far as only using S3 goes, it depends on your use case (i.e. what you plan on doing with the data while it is there). If you store it there in between running different applications, you can likely work around consistency issues. Also, if you use Amazon's EMRFS to access data in S3, you can use their new consistency feature ( https://aws.amazon.com/blogs/aws/emr-consistent-file-system/). Hope this helps! -Sven On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass jw...@crossref.org wrote: The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See Why you cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote: You could also just push the data to Amazon S3, which would un-link the size of the cluster needed to process the data from the size of the data. DR On 02/03/2015 11:43 AM, Joe Wass wrote: I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- http://sites.google.com/site/krasser/?utm_source=sig
Sort based shuffle not working properly?
I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
Just to add, I am suing Spark 1.1.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487p21488.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark (SQL) as OLAP engine
A great presentation by Evan Chan on utilizing Cassandra as Jonathan noted is at: OLAP with Cassandra and Spark http://www.slideshare.net/EvanChan2/2014-07olapcassspark. On Tue Feb 03 2015 at 10:03:34 AM Jonathan Haddad j...@jonhaddad.com wrote: Write out the rdd to a cassandra table. The datastax driver provides saveToCassandra() for this purpose. On Tue Feb 03 2015 at 8:59:15 AM Adamantios Corais adamantios.cor...@gmail.com wrote: Hi, After some research I have decided that Spark (SQL) would be ideal for building an OLAP engine. My goal is to push aggregated data (to Cassandra or other low-latency data storage) and then be able to project the results on a web page (web service). New data will be added (aggregated) once a day, only. On the other hand, the web service must be able to run some fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I can already achieve similar speeds while in REPL mode by caching the data. Therefore, I believe that my problem must be re-phrased as follows: How can I automatically cache the data once a day and make them available on a web service that is capable of running any Spark or Spark (SQL) statement in order to plot the results with D3.js? Note that I have already some experience in Spark (+Spark SQL) as well as D3.js but not at all with OLAP engines (at least in their traditional form). Any ideas or suggestions? *// Adamantios*
Re: 2GB limit for partitions?
That is fairly out of date (we used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com javascript:; wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com javascript:; wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com javascript:; *To:* Michael Albert m_albert...@yahoo.com javascript:; *Cc:* user@spark.apache.org javascript:; user@spark.apache.org javascript:; *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is