Re: Rolling upgrade Spark cluster
HDFS rolling upgrade in Hadoop 2.6 (available since 2.4) http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html Some parts of NM and RM work preserving restart was released in Hadoop 2.6.0. YARN-1367 After restart NM should resync with the RM without killing containers YARN-1337 Recover containers upon nodemanager restart The umbrella tickets YARN-556 and YARN-1336 are still open. Thanks, Bhaskar On Wed, Dec 17, 2014 at 12:10 PM, Kenichi Maehashi webmas...@kenichimaehashi.com wrote: Hi, I have a Spark cluster using standalone mode. Spark Master is configured as High Availablity mode. Now I am going to upgrade Spark from 1.0 to 1.1, but don't want to interrupt the currently running jobs. (1) Are there any way to perform a rolling upgrade (while running a job)? (2) If not, when using YARN as a cluster manager, can I perform a rolling upgrade? Thanks, Kenichi -- Kenichi Maehashi webmas...@kenichimaehashi.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
wordcount job slow while input from NFS mount
A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry
Re: when will the spark 1.3.0 be released?
Hi All, When will the Spark 1.2.0 be released? and What are the features in Spark 1.2.0 Regards, Rajesh On Wed, Dec 17, 2014 at 11:14 AM, Andrew Ash and...@andrewash.com wrote: Releases are roughly every 3mo so you should expect around March if the pace stays steady. 2014-12-16 22:56 GMT-05:00 Marco Shaw marco.s...@gmail.com: When it is ready. On Dec 16, 2014, at 11:43 PM, 张建轶 zhangjia...@youku.com wrote: Hi £¡ when will the spark 1.3.0 be released£¿ I want to use new LDA feature. Thank you! B‹CB• È [œÝXœØÜšX™K K[XZ[ ˆ \Ù\‹][œÝXœØÜšX™P Ü \šË˜\ XÚ K›Ü™ÃB‘›Üˆ Y ] [Û˜[ ÛÛ[X[™ Ë K[XZ[ ˆ \Ù\‹Z [ Ü \šË˜\ XÚ K›Ü™ÃBƒB - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark sc.textFile uses only 4 out of 32 threads per node
Rui is correct. Check how many partitions your RDD has after loading the gzipped files. e.g. rdd.getNumPartitions(). If that number is way less than the number of cores in your cluster (in your case I suspect the number is 4), then explicitly repartition the RDD to match the number of cores in your cluster, or some multiple thereof. For example: new_rdd = rdd.repartition(sc.defaultParallelism * 3) Operations on new_rdd should utilize all the cores in your cluster. Nick On Wed Dec 17 2014 at 1:42:16 AM Sun, Rui rui@intel.com wrote: Gautham, How many number of gz files do you have? Maybe the reason is that gz file is compressed that can't be splitted for processing by Mapreduce. A single gz file can only be processed by a single Mapper so that the CPU treads can't be fully utilized. -Original Message- From: Gautham [mailto:gautham.a...@gmail.com] Sent: Wednesday, December 10, 2014 3:00 AM To: u...@spark.incubator.apache.org Subject: pyspark sc.textFile uses only 4 out of 32 threads per node I am having an issue with pyspark launched in ec2 (using spark-ec2) with 5 r3.4xlarge machines where each has 32 threads and 240GB of RAM. When I do sc.textFile to load data from a number of gz files, it does not progress as fast as expected. When I log-in to a child node and run top, I see only 4 threads at 100 cpu. All remaining 28 cores were idle. This is not an issue when processing the strings after loading, when all the cores are used to process the data. Please help me with this? What setting can be changed to get the CPU usage back up to full? -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/pyspark-sc-textFile-uses-only-4-out- of-32-threads-per-node-tp20595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which version of Spark did you mean? Also, from what I can see in the docs http://spark.apache.org/docs/1.1.1/building-with-maven.html#specifying-the-hadoop-version, I believe the latest version of Hadoop that Spark supports is 2.4, not 2.6. Nick On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote: I also got the same problem.. 2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com: Hi, I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I get the following exception: 14/12/09 06:54:24 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address: http://0.0.0.0:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) Any idea why ? Thanks, Daniel
When will Spark SQL support building DB index natively?
Hi, In Spark SQL help document, it says Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL. - Block level bitmap indexes and virtual columns (used to build indexes) For our use cases, DB index is quite important. I have about 300G data in our database, and we always use customer id as a predicate for DB look up. Without DB index, we will have to scan all 300G data, and it will take 1 minute for a simple DB look up, while MySQL only takes 10 seconds. We tried to create an independent table for each customer id, the result is pretty good, but the logic will be very complex. I'm wondering when will Spark SQL supports DB index, and before that, is there an alternative way to support DB index function? Thanks
SchemaRDD.sample problem
Hi, I am using SparkSQL on 1.2.1 branch. The problem comes froms the following 4-line code: *val t1: SchemaRDD = hiveContext hql select * from product where is_new = 0 val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05) tb1.registerTempTable(t1_tmp) (hiveContext sql select count(*) from t1_tmp where is_new = 1) collect foreach println* We know that *t1* contains only rows whose is_new field is zero. After sampling t1 by taking 5% rows, normally, the sampled table should always contains only rows where is_new = 0. However, line 4 gives a number about 5 by chance. That means there are some rows where is_new = 1 in the sampled table, which is not logically possible. I am not sure SchemaRDD.sample is doing his work well. Any idea ? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Spark works fine with 2.4 *and later*. The docs don't mean to imply 2.4 is the last supported version. On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which version of Spark did you mean? Also, from what I can see in the docs, I believe the latest version of Hadoop that Spark supports is 2.4, not 2.6. Nick On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote: I also got the same problem.. 2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com: Hi, I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I get the following exception: 14/12/09 06:54:24 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address: http://0.0.0.0:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) Any idea why ? Thanks, Daniel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.IllegalStateException: unread block data
I found solution. I use HADOOP_MAPRED_HOME in my environment what clashes with spark. After I set empty HADOOP_MAPRED_HOME spark's started working. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p20742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Intermittent test failures
Using TestSQLContext from multiple tests leads to: SparkException: : Task not serializable ERROR ContextCleaner: Error cleaning broadcast 10 java.lang.NullPointerException at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:246) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:46) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138) at scala.Option.foreach(Option.scala:236) On 15.12.2014, at 22:36, Marius Soutier mps@gmail.com wrote: Ok, maybe these test versions will help me then. I’ll check it out. On 15.12.2014, at 22:33, Michael Armbrust mich...@databricks.com wrote: Using a single SparkContext should not cause this problem. In the SQL tests we use TestSQLContext and TestHive which are global singletons for all of our unit testing. On Mon, Dec 15, 2014 at 1:27 PM, Marius Soutier mps@gmail.com wrote: Possible, yes, although I’m trying everything I can to prevent it, i.e. fork in Test := true and isolated. Can you confirm that reusing a single SparkContext for multiple tests poses a problem as well? Other than that, just switching from SQLContext to HiveContext also provoked the error. On 15.12.2014, at 20:22, Michael Armbrust mich...@databricks.com wrote: Is it possible that you are starting more than one SparkContext in a single JVM with out stopping previous ones? I'd try testing with Spark 1.2, which will throw an exception in this case. On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier mps@gmail.com wrote: Hi, I’m seeing strange, random errors when running unit tests for my Spark jobs. In this particular case I’m using Spark SQL to read and write Parquet files, and one error that I keep running into is this one: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) I can only prevent this from happening by using isolated Specs tests thats always create a new SparkContext that is not shared between tests (but there can also be only a single SparkContext per test), and also by using standard SQLContext instead of HiveContext. It does not seem to have anything to do with the actual files that I also create during the test run with SQLContext.saveAsParquetFile. Cheers - Marius PS The full trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2) org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78) org.xerial.snappy.SnappyNative.uncompressedLength(Native Method) org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545) org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125) org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58) org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232) org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169) org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927) org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155) sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
Hadoop and spark together
Hi, I'm tring to use hadoop and spark togehter but they don't work. If I set HADOOP_MAPRED_HOME to use MRv1 or MRv2 spark stops working. If I set empty HADOOP_MAPRED_HOME to use spark , hadoop stops working. I use cloudera 5.1.3. Best regards, Morbious -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hadoop-and-spark-together-tp20743.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo?
Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.IllegalStateException: unread block data
same issue anyone help please -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalStateException-unread-block-data-tp20668p20745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: when will the spark 1.3.0 be released?
Spark 1.2.0 is coming in the next 48 hours according to http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-tc9815.html On Wed, Dec 17, 2014 at 10:11 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi All, When will the Spark 1.2.0 be released? and What are the features in Spark 1.2.0 Regards, Rajesh On Wed, Dec 17, 2014 at 11:14 AM, Andrew Ash and...@andrewash.com wrote: Releases are roughly every 3mo so you should expect around March if the pace stays steady. 2014-12-16 22:56 GMT-05:00 Marco Shaw marco.s...@gmail.com: When it is ready. On Dec 16, 2014, at 11:43 PM, 张建轶 zhangjia...@youku.com wrote: Hi £¡ when will the spark 1.3.0 be released£¿ I want to use new LDA feature. Thank you! B‹CB• È [œÝXœØÜšX™K K[XZ[ ˆ \Ù\‹][œÝXœØÜšX™P Ü \šË˜\ XÚ K›Ü™ÃB‘›Üˆ Y ] [Û˜[ ÛÛ[X[™ Ë K[XZ[ ˆ \Ù\‹Z [ Ü \šË˜\ XÚ K›Ü™ÃBƒB - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Are lazy values created once per node or once per partition?
Hello, Say, I have the following code: let something = Something() someRdd.foreachRdd(something.someMethod) And in something, I have a lazy member variable that gets created in something.someMethod. Would that lazy be created once per node, or once per partition? Thanks, Ashic.
Apache Spark 1.1.1 with Hbase 0.98.8-hadoop2 and hadoop 2.3.0
Hi All, I have downloaded pre built Spark 1.1.1 for Hadoop 2.3.0 then i did mvn install for the jar spark-assembly-1.1.1-hadoop2.3.0.jar available in lib folder of the spark downloaded and added its dependency as following in my java program dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-auth/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-yarn-common/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-mapreduce-client-core/artifactId /exclusion /exclusions /dependency I havse added my pom.xml But when i am trying to do hBaseRDD.count(); i am gettinig following exception java.lang.IllegalStateException (unread block data) [duplicate 1] pom.xml http://apache-spark-user-list.1001560.n3.nabble.com/file/n20746/pom.xml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-1-1-1-with-Hbase-0-98-8-hadoop2-and-hadoop-2-3-0-tp20746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming kafa best practices ?
Patrick, I was wondering why one would choose for rdd.map vs rdd.foreach to execute a side-effecting function on an RDD. -kr, Gerard. On Sat, Dec 6, 2014 at 12:57 AM, Patrick Wendell pwend...@gmail.com wrote: The second choice is better. Once you call collect() you are pulling all of the data onto a single node, you want to do most of the processing in parallel on the cluster, which is what map() will do. Ideally you'd try to summarize the data or reduce it before calling collect(). On Fri, Dec 5, 2014 at 5:26 AM, david david...@free.fr wrote: hi, What is the bet way to process a batch window in SparkStreaming : kafkaStream.foreachRDD(rdd = { rdd.collect().foreach(event = { // process the event process(event) }) }) Or kafkaStream.foreachRDD(rdd = { rdd.map(event = { // process the event process(event) }).collect() }) thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafa-best-practices-tp20470.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-ec2 starts hdfs1, tachyon but not spark
Hi All: I am new to Spark. I recently checked out and built spark 1.2 RC2 as an assembly. I then ran spark-ec2 according to: http://spark.apache.org/docs/latest/ec2-scripts.html I got master and slave instances in EC2 after running ./src/spark/ec2/spark-ec2 -k mykey -i mykey.pem -s 1 launch myclus All seem to run OK. However, I got no web UI's for spark master or slave. Logging into the nodes, I see HDFS and Tachyon processes but none for Spark. The /root/tachyon folder has a full complement of files including conf, logs and so forth: $ ls /root/tachyon bin docs libexec logs README.md target conf journal LICENSE pom.xml src The /root/spark folder only has a conf dir: $ ls /root/spark conf If I try to run the spark setup script I see errors like:: Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... ec2-some-ip.compute-1.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory This makes it seem that something did not get unpacked properly for Spark. Any hints or workarounds to fixing this? Cheers, Al
Re: Are lazy values created once per node or once per partition?
I would think that it has to be per worker. On Wed, Dec 17, 2014, 6:32 PM Ashic Mahtab as...@live.com wrote: Hello, Say, I have the following code: let something = Something() someRdd.foreachRdd(something.someMethod) And in something, I have a lazy member variable that gets created in something.someMethod. Would that lazy be created once per node, or once per partition? Thanks, Ashic.
RE: Control default partition when load a RDD from HDFS
Nice, that is the answer I want. Thanks! From: Sun, Rui [mailto:rui@intel.com] Sent: Wednesday, December 17, 2014 1:30 AM To: Shuai Zheng; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Hi, Shuai, How did you turn off the file split in Hadoop? I guess you might have implemented a customized FileInputFormat which overrides isSplitable() to return FALSE. If you do have such FileInputFormat, you can simply pass it as a constructor parameter to HadoopRDD or NewHadoopRDD in Spark. From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, December 17, 2014 4:16 AM To: user@spark.apache.org Subject: Control default partition when load a RDD from HDFS Hi All, My application load 1000 files, each file from 200M - a few GB, and combine with other data to do calculation. Some pre-calculation must be done on each file level, then after that, the result need to combine to do further calculation. In Hadoop, it is simple because I can turn-off the file split for input format (to enforce each file will go to same mapper), then I will do the file level calculation in mapper and pass result to reducer. But in spark, how can I do it? Basically I want to make sure after I load these files into RDD, it is partitioned by file (not split file and also no merge there), so I can call mapPartitions. Is it any way I can control the default partition when I load the RDD? This might be the default behavior that spark do the partition (partitioned by file when first time load the RDD), but I can't find any document to support my guess, if not, can I enforce this kind of partition? Because the total file size is bigger, I don't want to re-partition in the code. Regards, Shuai
Re: Apache Spark 1.1.1 with Hbase 0.98.8-hadoop2 and hadoop 2.3.0
Have you seen this thread ? http://search-hadoop.com/m/JW1q5FS8Mr1 If the problem you encountered is different, please give full stack trace. Cheers On Wed, Dec 17, 2014 at 5:43 AM, Amit Singh Hora hora.a...@gmail.com wrote: Hi All, I have downloaded pre built Spark 1.1.1 for Hadoop 2.3.0 then i did mvn install for the jar spark-assembly-1.1.1-hadoop2.3.0.jar available in lib folder of the spark downloaded and added its dependency as following in my java program dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-auth/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-yarn-common/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-mapreduce-client-core/artifactId /exclusion /exclusions /dependency I havse added my pom.xml But when i am trying to do hBaseRDD.count(); i am gettinig following exception java.lang.IllegalStateException (unread block data) [duplicate 1] pom.xml http://apache-spark-user-list.1001560.n3.nabble.com/file/n20746/pom.xml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-1-1-1-with-Hbase-0-98-8-hadoop2-and-hadoop-2-3-0-tp20746.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Control default partition when load a RDD from HDFS
Why not is a good option to create a RDD per each 200Mb file and then apply the pre-calculations before merging them? I think the partitions per RDD must be transparent to the pre-calculations, and not to set them fixed to optimize the spark maps/reduces processes. De: Shuai Zheng [mailto:szheng.c...@gmail.com] Enviado el: miércoles, 17 de diciembre de 2014 16:01 Para: 'Sun, Rui'; user@spark.apache.org Asunto: RE: Control default partition when load a RDD from HDFS Nice, that is the answer I want. Thanks! From: Sun, Rui [mailto:rui@intel.com] Sent: Wednesday, December 17, 2014 1:30 AM To: Shuai Zheng; user@spark.apache.org Subject: RE: Control default partition when load a RDD from HDFS Hi, Shuai, How did you turn off the file split in Hadoop? I guess you might have implemented a customized FileInputFormat which overrides isSplitable() to return FALSE. If you do have such FileInputFormat, you can simply pass it as a constructor parameter to HadoopRDD or NewHadoopRDD in Spark. From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Wednesday, December 17, 2014 4:16 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Control default partition when load a RDD from HDFS Hi All, My application load 1000 files, each file from 200M - a few GB, and combine with other data to do calculation. Some pre-calculation must be done on each file level, then after that, the result need to combine to do further calculation. In Hadoop, it is simple because I can turn-off the file split for input format (to enforce each file will go to same mapper), then I will do the file level calculation in mapper and pass result to reducer. But in spark, how can I do it? Basically I want to make sure after I load these files into RDD, it is partitioned by file (not split file and also no merge there), so I can call mapPartitions. Is it any way I can control the default partition when I load the RDD? This might be the default behavior that spark do the partition (partitioned by file when first time load the RDD), but I can't find any document to support my guess, if not, can I enforce this kind of partition? Because the total file size is bigger, I don't want to re-partition in the code. Regards, Shuai Disclaimer: http://disclaimer.agbar.com
Get the value of DStream[(String, Iterable[String])]
I'm a newbie with Spark,,, a simple question val errorLines = lines.filter(_.contains(h)) val mapErrorLines = errorLines.map(line = (key, line)) val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4)) I get something like: 604: --- 605: Time: 141883218 ms 606: --- 607: (key,ArrayBuffer(h2, h3, h4)) Now, I would like to get that ArrayBuffer and count the number of elements,, How could I get that arrayBuffer??? something like: val values = grouping.getValue()... How could I do this in Spark with Scala? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL 1.1.1 reading LZO compressed json files
Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Re: Get the value of DStream[(String, Iterable[String])]
You can create a DStream that contains the count, transforming the grouped windowed RDD, like this: val errorCount = grouping.map{case (k,v) = v.size } If you need to preserve the key: val errorCount = grouping.map{case (k,v) = (k,v.size) } or you if you don't care about the content of the values, you could count directly, instead of grouping first: val errorCount = mapErrorLines.countByWindow(Seconds(8), Seconds(4)) Not sure why you're using map(line = (key, line)) as there only seem to be one key. If that's not required, we can simplify one more step: val errorCount = errorLines.countByWindow(Seconds(8), Seconds(4)) The question is: what do you want to do with that count afterwards? -kr, Gerard. On Wed, Dec 17, 2014 at 5:11 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I'm a newbie with Spark,,, a simple question val errorLines = lines.filter(_.contains(h)) val mapErrorLines = errorLines.map(line = (key, line)) val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4)) I get something like: 604: --- 605: Time: 141883218 ms 606: --- 607: (key,ArrayBuffer(h2, h3, h4)) Now, I would like to get that ArrayBuffer and count the number of elements,, How could I get that arrayBuffer??? something like: val values = grouping.getValue()... How could I do this in Spark with Scala? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.1.1 reading LZO compressed json files
Hi Ted, Thanks for your help. I'm able to read lzo files using sparkContext.newAPIHadoopFile but I couldn't do the same for sqlContext because sqlContext.josnFile does not provide ways to configure the input file format. Do you know if there are some APIs to do that? Best Regards, Jerry On Wed, Dec 17, 2014 at 11:27 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Re: Spark SQL 1.1.1 reading LZO compressed json files
See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Who is using Spark and related technologies for bioinformatics applications?
I am aware of the ADAM project in Berkeley and I am working on Proteomic searches - anyone else working in this space
Re: Spark SQL 1.1.1 reading LZO compressed json files
In SQLContext: def jsonFile(path: String, samplingRatio: Double): SchemaRDD = { val json = sparkContext.textFile(path) jsonRDD(json, samplingRatio) } Looks like jsonFile() can be enhanced with call to sparkContext.newAPIHadoopFile() with proper input file format. Cheers On Wed, Dec 17, 2014 at 8:33 AM, Jerry Lam chiling...@gmail.com wrote: Hi Ted, Thanks for your help. I'm able to read lzo files using sparkContext.newAPIHadoopFile but I couldn't do the same for sqlContext because sqlContext.josnFile does not provide ways to configure the input file format. Do you know if there are some APIs to do that? Best Regards, Jerry On Wed, Dec 17, 2014 at 11:27 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Implementing a spark version of Haskell's partition
Hi all, I would like to be able to split a RDD in two pieces according to a predicate. That would be equivalent to applying filter twice, with the predicate and its complement, which is also similar to Haskell's partition list function ( http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html). There is currently any way to do this in Spark?, or maybe anyone has a suggestion about how to implent this by modifying the Spark source. I think this is valuable because sometimes I need to split a RDD in several groups that are too big to fit in the memory of a single thread, so pair RDDs are not solution for those cases. A generalization to n parts of Haskell's partition would do the job. Thanks a lot for your help. Greetings, Juan Rodriguez
Re: Implementing a spark version of Haskell's partition
yo, First, here is the scala version: http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@partition(p:A= Boolean):(Repr,Repr) Second: RDD is distributed so what you'll have to do is to partition each partition each partition (:-D) or create two RDDs with by filtering twice → hence tasks will be scheduled distinctly, and data read twice. Choose what's best for you! hth, andy On Wed Dec 17 2014 at 5:57:56 PM Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi all, I would like to be able to split a RDD in two pieces according to a predicate. That would be equivalent to applying filter twice, with the predicate and its complement, which is also similar to Haskell's partition list function ( http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html). There is currently any way to do this in Spark?, or maybe anyone has a suggestion about how to implent this by modifying the Spark source. I think this is valuable because sometimes I need to split a RDD in several groups that are too big to fit in the memory of a single thread, so pair RDDs are not solution for those cases. A generalization to n parts of Haskell's partition would do the job. Thanks a lot for your help. Greetings, Juan Rodriguez
wordcount job slow while input from NFS mount
Hi, A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry
wordcount job slow while input from NFS mount
A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wordcount-job-slow-while-input-from-NFS-mount-tp20747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Get the value of DStream[(String, Iterable[String])]
What I would like to do it's to count the number of elements and if it's greater than a number, I have to iterate all them and store them in mysql or another system. So, I need to count them and preserve the values because saving in other system. I know about this map(line = (key, line)), it was just a test, I want to change key for a value which comes from a RE. 2014-12-17 17:28 GMT+01:00 Gerard Maas gerard.m...@gmail.com: You can create a DStream that contains the count, transforming the grouped windowed RDD, like this: val errorCount = grouping.map{case (k,v) = v.size } If you need to preserve the key: val errorCount = grouping.map{case (k,v) = (k,v.size) } or you if you don't care about the content of the values, you could count directly, instead of grouping first: val errorCount = mapErrorLines.countByWindow(Seconds(8), Seconds(4)) Not sure why you're using map(line = (key, line)) as there only seem to be one key. If that's not required, we can simplify one more step: val errorCount = errorLines.countByWindow(Seconds(8), Seconds(4)) The question is: what do you want to do with that count afterwards? -kr, Gerard. On Wed, Dec 17, 2014 at 5:11 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I'm a newbie with Spark,,, a simple question val errorLines = lines.filter(_.contains(h)) val mapErrorLines = errorLines.map(line = (key, line)) val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4)) I get something like: 604: --- 605: Time: 141883218 ms 606: --- 607: (key,ArrayBuffer(h2, h3, h4)) Now, I would like to get that ArrayBuffer and count the number of elements,, How could I get that arrayBuffer??? something like: val values = grouping.getValue()... How could I do this in Spark with Scala? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Thanks for the correction, Sean. Do the docs need to be updated on this point, or is it safer for now just to note 2.4 specifically? On Wed Dec 17 2014 at 5:54:53 AM Sean Owen so...@cloudera.com wrote: Spark works fine with 2.4 *and later*. The docs don't mean to imply 2.4 is the last supported version. On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which version of Spark did you mean? Also, from what I can see in the docs, I believe the latest version of Hadoop that Spark supports is 2.4, not 2.6. Nick On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote: I also got the same problem.. 2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com: Hi, I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I get the following exception: 14/12/09 06:54:24 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address: http://0.0.0.0:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoade r.java:142) Any idea why ? Thanks, Daniel
Re: Get the value of DStream[(String, Iterable[String])]
Basically what I want to do it'd be something like.. val errorLines = lines.filter(_.contains(h)) val mapErrorLines = errorLines.map(line = (key, line)) val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4)) if (errorLinesValue.getValue().size() X){ //iterate values and do something for each element. } I think that it must be pretty basic,, argg. 2014-12-17 18:43 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: What I would like to do it's to count the number of elements and if it's greater than a number, I have to iterate all them and store them in mysql or another system. So, I need to count them and preserve the values because saving in other system. I know about this map(line = (key, line)), it was just a test, I want to change key for a value which comes from a RE. 2014-12-17 17:28 GMT+01:00 Gerard Maas gerard.m...@gmail.com: You can create a DStream that contains the count, transforming the grouped windowed RDD, like this: val errorCount = grouping.map{case (k,v) = v.size } If you need to preserve the key: val errorCount = grouping.map{case (k,v) = (k,v.size) } or you if you don't care about the content of the values, you could count directly, instead of grouping first: val errorCount = mapErrorLines.countByWindow(Seconds(8), Seconds(4)) Not sure why you're using map(line = (key, line)) as there only seem to be one key. If that's not required, we can simplify one more step: val errorCount = errorLines.countByWindow(Seconds(8), Seconds(4)) The question is: what do you want to do with that count afterwards? -kr, Gerard. On Wed, Dec 17, 2014 at 5:11 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I'm a newbie with Spark,,, a simple question val errorLines = lines.filter(_.contains(h)) val mapErrorLines = errorLines.map(line = (key, line)) val grouping = errorLinesValue.groupByKeyAndWindow(Seconds(8), Seconds(4)) I get something like: 604: --- 605: Time: 141883218 ms 606: --- 607: (key,ArrayBuffer(h2, h3, h4)) Now, I would like to get that ArrayBuffer and count the number of elements,, How could I get that arrayBuffer??? something like: val values = grouping.getValue()... How could I do this in Spark with Scala? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Implementing a spark version of Haskell's partition
Hi Andy, thanks for your response. I already thought about filtering twice, that was what I meant with that would be equivalent to applying filter twice, but I was thinking if I could do it in a single pass, so that could be later generalized to an arbitrary numbers of classes. I would also like to be able to generate RDDs instead of partitions of a single RDD, so I could use RDD methods like stats() on the fragments. But I think there is currently no RDD method that returns more than one RDD for a single input RDD, so maybe there is some design limitation on Spark that prevents this? Again, thanks for your answer. Greetings, Juan El 17/12/2014 18:15, andy petrella andy.petre...@gmail.com escribió: yo, First, here is the scala version: http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@partition(p:A= Boolean):(Repr,Repr) Second: RDD is distributed so what you'll have to do is to partition each partition each partition (:-D) or create two RDDs with by filtering twice → hence tasks will be scheduled distinctly, and data read twice. Choose what's best for you! hth, andy On Wed Dec 17 2014 at 5:57:56 PM Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi all, I would like to be able to split a RDD in two pieces according to a predicate. That would be equivalent to applying filter twice, with the predicate and its complement, which is also similar to Haskell's partition list function ( http://hackage.haskell.org/package/base-4.7.0.1/docs/Data-List.html). There is currently any way to do this in Spark?, or maybe anyone has a suggestion about how to implent this by modifying the Spark source. I think this is valuable because sometimes I need to split a RDD in several groups that are too big to fit in the memory of a single thread, so pair RDDs are not solution for those cases. A generalization to n parts of Haskell's partition would do the job. Thanks a lot for your help. Greetings, Juan Rodriguez
Re: spark streaming kafa best practices ?
Foreach is slightly more efficient because Spark doesn't bother to try and collect results from each task since it's understood there will be no return type. I think the difference is very marginal though - it's mostly stylistic... typically you use foreach for something that is intended to produce a side effect and map for something that will return a new dataset. On Wed, Dec 17, 2014 at 5:43 AM, Gerard Maas gerard.m...@gmail.com wrote: Patrick, I was wondering why one would choose for rdd.map vs rdd.foreach to execute a side-effecting function on an RDD. -kr, Gerard. On Sat, Dec 6, 2014 at 12:57 AM, Patrick Wendell pwend...@gmail.com wrote: The second choice is better. Once you call collect() you are pulling all of the data onto a single node, you want to do most of the processing in parallel on the cluster, which is what map() will do. Ideally you'd try to summarize the data or reduce it before calling collect(). On Fri, Dec 5, 2014 at 5:26 AM, david david...@free.fr wrote: hi, What is the bet way to process a batch window in SparkStreaming : kafkaStream.foreachRDD(rdd = { rdd.collect().foreach(event = { // process the event process(event) }) }) Or kafkaStream.foreachRDD(rdd = { rdd.map(event = { // process the event process(event) }).collect() }) thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafa-best-practices-tp20470.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Just to clarify, are you running the application using spark-submit after packaging with sbt package ? One thing that might help is to mark the Spark dependency as 'provided' as then you shouldn't have the Spark classes in your jar. Thanks Shivaram On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.com wrote: You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Thanks for your replies. I was building spark from trunk. Daniel On 17 בדצמ׳ 2014, at 19:49, Nicholas Chammas nicholas.cham...@gmail.com wrote: Thanks for the correction, Sean. Do the docs need to be updated on this point, or is it safer for now just to note 2.4 specifically? On Wed Dec 17 2014 at 5:54:53 AM Sean Owen so...@cloudera.com wrote: Spark works fine with 2.4 *and later*. The docs don't mean to imply 2.4 is the last supported version. On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which version of Spark did you mean? Also, from what I can see in the docs, I believe the latest version of Hadoop that Spark supports is 2.4, not 2.6. Nick On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote: I also got the same problem.. 2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com: Hi, I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I get the following exception: 14/12/09 06:54:24 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address: http://0.0.0.0:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) Any idea why ? Thanks, Daniel
Re: Spark SQL 1.1.1 reading LZO compressed json files
You can create an RDD[String] using whatever method and pass that to jsonRDD. On Wed, Dec 17, 2014 at 8:33 AM, Jerry Lam chiling...@gmail.com wrote: Hi Ted, Thanks for your help. I'm able to read lzo files using sparkContext.newAPIHadoopFile but I couldn't do the same for sqlContext because sqlContext.josnFile does not provide ways to configure the input file format. Do you know if there are some APIs to do that? Best Regards, Jerry On Wed, Dec 17, 2014 at 11:27 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Re: When will Spark SQL support building DB index natively?
- Dev list Have you looked at partitioned table support? That would only scan data where the predicate matches the partition. Depending on the cardinality of the customerId column that could be a good option for you. On Wed, Dec 17, 2014 at 2:25 AM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, In Spark SQL help document, it says Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL. - Block level bitmap indexes and virtual columns (used to build indexes) For our use cases, DB index is quite important. I have about 300G data in our database, and we always use customer id as a predicate for DB look up. Without DB index, we will have to scan all 300G data, and it will take 1 minute for a simple DB look up, while MySQL only takes 10 seconds. We tried to create an independent table for each customer id, the result is pretty good, but the logic will be very complex. I'm wondering when will Spark SQL supports DB index, and before that, is there an alternative way to support DB index function? Thanks
SparkSQL 1.2.1-snapshot Left Join problem
Hi, When running SparkSQL branch 1.2.1 on EC2 standalone cluster, the following query does not work: create table debug as select v1.* from t1 as v1 left join t2 as v2 on v1.sku = v2.sku where v2.sku is null Both t1 and t2 have 200 partitions. t1 has 10k rows, and t2 has 4k rows. this query block at: 14/12/17 15:56:54 INFO TaskSetManager: Finished task 133.0 in stage 2.0 (TID 541) in 370 ms on ip-10-79-184-49.ec2.internal (122/200) Via WebUI, I can see there are 24 tasks running, as the cluster has 24 core. The other tasks are succeeded. It seems that the 24 tasks are blocked and won't end. However, SparkSQL 1.1.0 works fine. There might be some problems with join on 1.2.1 Any idea? Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-2-1-snapshot-Left-Join-problem-tp20748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.1.1 reading LZO compressed json files
Hi Michael, This is what I did. I was thinking if there is a more efficient way to accomplish this. I was doing a very simple benchmark: Convert lzo compressed json files to parquet files using SparkSQL vs. Hadoop MR. Spark SQL seems to require 2 stages to accomplish this task: Stage 1: read the lzo files using newAPIHadoopFile with LzoTextInputFormat and then convert it to JsonRDD Stage 2: saveAsParquetFile from the JsonRDD In Hadoop, it takes 1 step, a map-only job to read the data and then output the json to the parquet file (I'm using elephant bird LzoJsonLoader to load the files) In some scenarios, Hadoop is faster because it is saving one stage. Did I do something wrong? Best Regards, Jerry On Wed, Dec 17, 2014 at 1:29 PM, Michael Armbrust mich...@databricks.com wrote: You can create an RDD[String] using whatever method and pass that to jsonRDD. On Wed, Dec 17, 2014 at 8:33 AM, Jerry Lam chiling...@gmail.com wrote: Hi Ted, Thanks for your help. I'm able to read lzo files using sparkContext.newAPIHadoopFile but I couldn't do the same for sqlContext because sqlContext.josnFile does not provide ways to configure the input file format. Do you know if there are some APIs to do that? Best Regards, Jerry On Wed, Dec 17, 2014 at 11:27 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Re: S3 globbing
Hi Akhil, Thanks for your time. I appreciate .I tried this approach , but either I am getting less files or more files not exact hour files. Is there any way I can tell the range (between this time to this time) Thanks, D On Tue, Dec 16, 2014 at 11:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try something like: //Get the last hour val d = (System.currentTimeMillis() - 3600 * 1000) val ex = abc_ + d.toString().substring(0,7) + *.json [image: Inline image 1] Thanks Best Regards On Wed, Dec 17, 2014 at 5:05 AM, durga durgak...@gmail.com wrote: Hi All, I need help with regex in my sc.textFile() I have lots of files with with epoch millisecond timestamp. ex:abc_1418759383723.json Now I need to consume last one hour files using the epoch time stamp as mentioned above. I tried couple of options , nothing seems working for me. If any one of you face this issue and got a solution , please help me. Appreciating your help, Thanks, D -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3-globbing-tp20731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.1.1 reading LZO compressed json files
The first pass is inferring the schema of the JSON data. If you already know the schema you can skip this pass by specifying the schema as the second parameter to jsonRDD. On Wed, Dec 17, 2014 at 10:59 AM, Jerry Lam chiling...@gmail.com wrote: Hi Michael, This is what I did. I was thinking if there is a more efficient way to accomplish this. I was doing a very simple benchmark: Convert lzo compressed json files to parquet files using SparkSQL vs. Hadoop MR. Spark SQL seems to require 2 stages to accomplish this task: Stage 1: read the lzo files using newAPIHadoopFile with LzoTextInputFormat and then convert it to JsonRDD Stage 2: saveAsParquetFile from the JsonRDD In Hadoop, it takes 1 step, a map-only job to read the data and then output the json to the parquet file (I'm using elephant bird LzoJsonLoader to load the files) In some scenarios, Hadoop is faster because it is saving one stage. Did I do something wrong? Best Regards, Jerry On Wed, Dec 17, 2014 at 1:29 PM, Michael Armbrust mich...@databricks.com wrote: You can create an RDD[String] using whatever method and pass that to jsonRDD. On Wed, Dec 17, 2014 at 8:33 AM, Jerry Lam chiling...@gmail.com wrote: Hi Ted, Thanks for your help. I'm able to read lzo files using sparkContext.newAPIHadoopFile but I couldn't do the same for sqlContext because sqlContext.josnFile does not provide ways to configure the input file format. Do you know if there are some APIs to do that? Best Regards, Jerry On Wed, Dec 17, 2014 at 11:27 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Re: Spark SQL 1.1.1 reading LZO compressed json files
To be a little more clear jsonRDD and jsonFile use the same implementation underneath. jsonFile is just a connivence method that does jsonRDD(sc.textFile(...)) On Wed, Dec 17, 2014 at 11:37 AM, Michael Armbrust mich...@databricks.com wrote: The first pass is inferring the schema of the JSON data. If you already know the schema you can skip this pass by specifying the schema as the second parameter to jsonRDD. On Wed, Dec 17, 2014 at 10:59 AM, Jerry Lam chiling...@gmail.com wrote: Hi Michael, This is what I did. I was thinking if there is a more efficient way to accomplish this. I was doing a very simple benchmark: Convert lzo compressed json files to parquet files using SparkSQL vs. Hadoop MR. Spark SQL seems to require 2 stages to accomplish this task: Stage 1: read the lzo files using newAPIHadoopFile with LzoTextInputFormat and then convert it to JsonRDD Stage 2: saveAsParquetFile from the JsonRDD In Hadoop, it takes 1 step, a map-only job to read the data and then output the json to the parquet file (I'm using elephant bird LzoJsonLoader to load the files) In some scenarios, Hadoop is faster because it is saving one stage. Did I do something wrong? Best Regards, Jerry On Wed, Dec 17, 2014 at 1:29 PM, Michael Armbrust mich...@databricks.com wrote: You can create an RDD[String] using whatever method and pass that to jsonRDD. On Wed, Dec 17, 2014 at 8:33 AM, Jerry Lam chiling...@gmail.com wrote: Hi Ted, Thanks for your help. I'm able to read lzo files using sparkContext.newAPIHadoopFile but I couldn't do the same for sqlContext because sqlContext.josnFile does not provide ways to configure the input file format. Do you know if there are some APIs to do that? Best Regards, Jerry On Wed, Dec 17, 2014 at 11:27 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/JW1q5HAuFv which references https://issues.apache.org/jira/browse/SPARK-2394 Cheers On Wed, Dec 17, 2014 at 8:21 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users, Do you know how to read json files using Spark SQL that are LZO compressed? I'm looking into sqlContext.jsonFile but I don't know how to configure it to read lzo files. Best Regards, Jerry
Re: wordcount job slow while input from NFS mount
The problem is very likely NFS, not Spark. What kind of network is it mounted over? You can also test the performance of your NFS by copying a file from it to a local disk or to /dev/null and seeing how many bytes per second it can copy. Matei On Dec 17, 2014, at 9:38 AM, Larryliu larryli...@gmail.com wrote: A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wordcount-job-slow-while-input-from-NFS-mount-tp20747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: wordcount job slow while input from NFS mount
Hi, Matei Thanks for your response. I tried to copy the file (1G) from NFS and took 10 seconds. The NFS mount is a LAN environment and the NFS server is running on the same server that Spark is running on. So basically I mount the NFS on the same bare metal machine. Larry On Wed, Dec 17, 2014 at 11:42 AM, Matei Zaharia matei.zaha...@gmail.com wrote: The problem is very likely NFS, not Spark. What kind of network is it mounted over? You can also test the performance of your NFS by copying a file from it to a local disk or to /dev/null and seeing how many bytes per second it can copy. Matei On Dec 17, 2014, at 9:38 AM, Larryliu larryli...@gmail.com wrote: A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wordcount-job-slow-while-input-from-NFS-mount-tp20747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: wordcount job slow while input from NFS mount
I see, you may have something else configured weirdly then. You should look at CPU and disk utilization while your Spark job is reading from NFS and, if you see high CPU use, run jstack to see where the process is spending time. Also make sure Spark's local work directories (spark.local.dir) are not on NFS. They shouldn't be though, that should be /tmp. Matei On Dec 17, 2014, at 11:56 AM, Larry Liu larryli...@gmail.com wrote: Hi, Matei Thanks for your response. I tried to copy the file (1G) from NFS and took 10 seconds. The NFS mount is a LAN environment and the NFS server is running on the same server that Spark is running on. So basically I mount the NFS on the same bare metal machine. Larry On Wed, Dec 17, 2014 at 11:42 AM, Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com wrote: The problem is very likely NFS, not Spark. What kind of network is it mounted over? You can also test the performance of your NFS by copying a file from it to a local disk or to /dev/null and seeing how many bytes per second it can copy. Matei On Dec 17, 2014, at 9:38 AM, Larryliu larryli...@gmail.com mailto:larryli...@gmail.com wrote: A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wordcount-job-slow-while-input-from-NFS-mount-tp20747.html http://apache-spark-user-list.1001560.n3.nabble.com/wordcount-job-slow-while-input-from-NFS-mount-tp20747.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Nabble mailing list mirror errors: This post has NOT been accepted by the mailing list yet
Yeah, it looks like messages that are successfully posted via Nabble end up on the Apache mailing list, but messages posted directly to Apache aren't mirrored to Nabble anymore because it's based off the incubator mailing list. We should fix this so that Nabble posts to / archives the non-incubator list. On Sat, Dec 13, 2014 at 6:27 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Since you mentioned this, I had a related quandry recently -- it also says that the forum archives *u...@spark.incubator.apache.org u...@spark.incubator.apache.org/* *d...@spark.incubator.apache.org d...@spark.incubator.apache.org *respectively, yet the Community page clearly says to email the @spark.apache.org list (but the nabble archive is linked right there too). IMO even putting a clear explanation at the top Posting here requires that you create an account via the UI. Your message will be sent to both spark.incubator.apache.org and spark.apache.org (if that is the case, i'm not sure which alias nabble posts get sent to) would make things a lot more clear. On Sat, Dec 13, 2014 at 5:05 PM, Josh Rosen rosenvi...@gmail.com wrote: I've noticed that several users are attempting to post messages to Spark's user / dev mailing lists using the Nabble web UI ( http://apache-spark-user-list.1001560.n3.nabble.com/). However, there are many posts in Nabble that are not posted to the Apache lists and are flagged with This post has NOT been accepted by the mailing list yet. errors. I suspect that the issue is that users are not completing the sign-up confirmation process ( http://apache-spark-user-list.1001560.n3.nabble.com/mailing_list/MailingListOptions.jtp?forum=1), which is preventing their emails from being accepted by the mailing list. I wanted to mention this issue to the Spark community to see whether there are any good solutions to address this. I have spoken to users who think that our mailing list is unresponsive / inactive because their un-posted messages haven't received any replies. - Josh
Re: wordcount job slow while input from NFS mount
Thanks, Matei. I will give it a try. Larry On Wed, Dec 17, 2014 at 1:01 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I see, you may have something else configured weirdly then. You should look at CPU and disk utilization while your Spark job is reading from NFS and, if you see high CPU use, run jstack to see where the process is spending time. Also make sure Spark's local work directories (spark.local.dir) are not on NFS. They shouldn't be though, that should be /tmp. Matei On Dec 17, 2014, at 11:56 AM, Larry Liu larryli...@gmail.com wrote: Hi, Matei Thanks for your response. I tried to copy the file (1G) from NFS and took 10 seconds. The NFS mount is a LAN environment and the NFS server is running on the same server that Spark is running on. So basically I mount the NFS on the same bare metal machine. Larry On Wed, Dec 17, 2014 at 11:42 AM, Matei Zaharia matei.zaha...@gmail.com wrote: The problem is very likely NFS, not Spark. What kind of network is it mounted over? You can also test the performance of your NFS by copying a file from it to a local disk or to /dev/null and seeing how many bytes per second it can copy. Matei On Dec 17, 2014, at 9:38 AM, Larryliu larryli...@gmail.com wrote: A wordcounting job for about 1G text file takes 1 hour while input from a NFS mount. The same job took 30 seconds while input from local file system. Is there any tuning required for a NFS mount input? Thanks Larry -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/wordcount-job-slow-while-input-from-NFS-mount-tp20747.html Sent from the Apache Spark User List mailing list archive at Nabble.com . - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Help with updateStateByKey
I am trying to run stateful Spark Streaming computations over (fake) apache web server logs read from Kafka. The goal is to sessionize the web traffic similar to this blog post: http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ The only difference is that I want to sessionize each page the IP hits, instead of the entire session. I was able to do this reading from a file of fake web traffic using Spark in batch mode, but now I want to do it in a streaming context. Log files are read from Kafka and parsed into K/V pairs of (String, (String, Long, Long)) or (IP, (requestPage, time, time)) I then call groupByKey() on this K/V pair. In batch mode, this would produce a: (String, CollectionBuffer((String, Long, Long), ...) or (IP, CollectionBuffer((requestPage, time, time), ...) In a StreamingContext, it produces a: (String, ArrayBuffer((String, Long, Long), ...) like so: (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) However, as the next microbatch (DStream) arrives, this information is discarded. Ultimately what I want is for that ArrayBuffer to fill up over time as a given IP continues to interact and to run some computations on its data to sessionize the page time. I believe the operator to make that happen is updateStateByKey. I'm having some trouble with this operator (I'm new to both Spark Scala); any help is appreciated. Thus far: val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) def updateGroupByKey( a: Seq[(String, ArrayBuffer[(String, Long, Long)])], b: Option[(String, ArrayBuffer[(String, Long, Long)])] ): Option[(String, ArrayBuffer[(String, Long, Long)])] = { } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-sql with join terribly slow.
Guys, I'm trying to join 2-3 schemaRDD's for approx 30,000 rows and it is terribly slow.No doubt I get the results but it takes 8s to do the join and get the results. I'm running on a standalone spark in my m/c having 8 cores and 12gb RAM with 4 workers. Not sure why it is consuming time,any inputs appreciated.. This is just an e.g on what I'm trying to say. RDD1(30,000 rows) state,city,amount RDD2 (50 rows) state,amount1 join by state New RDD3:(30,000 rows) state,city,amount,amount1 Do a select(amount-amount1) from New RDD3. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-with-join-terribly-slow-tp20751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
building with Hadoop 1.0.4 / where is hadoop-yarn-common:1.0.4 ?
I tried sending this message to the users list several hours ago, but it did not get distributed. I was just trying to build Spark, v1.1.1 with defaults. It sets hadoop.version to 1.0.4, and yarn.version to hadoop.version, the dependency entry for org.apache.hadoop:hadoop-yarn-common sets version to ${yarn.version}. When you build it, you will get: Failure to find org.apache.hadoop:hadoop-yarn-common:jar:1.0.4 I’m not sure, but I don’t think there was ever a 1.0.4 version of hadoop-yarn-common, or at least that’s the case according to: http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-common ? Is this a bug, or is there a repo somewhere that contains that artifact? Thanks, Tim
Re: building with Hadoop 1.0.4 / where is hadoop-yarn-common:1.0.4 ?
There's no such version of YARN. But you only build the YARN support when you set -Pyarn. Then, yes you have to set yarn.version separately for earlier versions that didn't match up with Hadoop versions. http://spark.apache.org/docs/latest/building-with-maven.html On Thu, Dec 18, 2014 at 12:35 AM, Tim Harsch thar...@cray.com wrote: I tried sending this message to the users list several hours ago, but it did not get distributed. I was just trying to build Spark, v1.1.1 with defaults. It sets hadoop.version to 1.0.4, and yarn.version to hadoop.version, the dependency entry for org.apache.hadoop:hadoop-yarn-common sets version to ${yarn.version}. When you build it, you will get: Failure to find org.apache.hadoop:hadoop-yarn-common:jar:1.0.4 I’m not sure, but I don’t think there was ever a 1.0.4 version of hadoop-yarn-common, or at least that’s the case according to: http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-common ? Is this a bug, or is there a repo somewhere that contains that artifact? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.NotSerializableException: org.apache.avro.mapred.AvroKey using spark with avro
Yeah, I have the same problem with 1.1.0, but not 1.0.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-NotSerializableException-org-apache-avro-mapred-AvroKey-using-spark-with-avro-tp15165p20752.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SPARK-2243 Support multiple SparkContexts in the same JVM
Greetings, First comment on the issue says that reason for non-supporting of multiple contexts is There are numerous assumptions in the code base that uses a shared cache or thread local variables or some global identifiers which prevent us from using multiple SparkContext's. May it be worked around by creating those context in several classloaders with their own copies of Spark classes? Thanks, Anton
Re: No disk single pass RDD aggregation
Jim Carroll wrote Okay, I have an rdd that I want to run an aggregate over but it insists on spilling to disk even though I structured the processing to only require a single pass. In other words, I can do all of my processing one entry in the rdd at a time without persisting anything. I set rdd.persist(StorageLevel.NONE) and it had no affect. When I run locally I get my /tmp directory filled with transient rdd data even though I never need the data again after the row's been processed. Is there a way to turn this off? Thanks Jim hi, Did you have many input file? If it is, try to use conf.set(spark.shuffle.consolidateFiles, true); Hope this help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-disk-single-pass-RDD-aggregation-tp20723p20753.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK-2243 Support multiple SparkContexts in the same JVM
Hi Anton, That could solve some of the issues (I've played with that a little bit). But there are still some areas where this would be sub-optimal, because Spark still uses system properties in some places and those are global, not per-class loader. (SparkSubmit is the biggest offender here, but if you're doing multiple contexts in the same VM you're probably not using SparkSubmit. The rest of the code is a lot better but I wouldn't count on it being 100% safe.) On Wed, Dec 17, 2014 at 6:23 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Greetings, First comment on the issue says that reason for non-supporting of multiple contexts is “There are numerous assumptions in the code base that uses a shared cache or thread local variables or some global identifiers which prevent us from using multiple SparkContext's.” May it be worked around by creating those context in several classloaders with their own copies of Spark classes? Thanks, Anton -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLLib /ALS : java.lang.OutOfMemoryError: Java heap space
I am not sure this can help you. I have 57 million rating,about 4million user and 4k items. I used 7-14 total-executor-cores,executal-memory 13g,cluster have 4 nodes,each have 4cores,max memory 16g. I found set as follows may help avoid this problem: conf.set(spark.shuffle.memoryFraction,0.65) //default is 0.2 conf.set(spark.storage.memoryFraction,0.3)//default is 0.6 I have to set rank value under 40, otherwise occure this problem. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-ALS-java-lang-OutOfMemoryError-Java-heap-space-tp20584p20755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Sean, Yes, the problem is exactly anonymous function mis-matching as you described So if an Spark app (driver) depends on a Spark module jar (for example spark-core) to programmatically communicate with a Spark cluster, user should not use pre-built Spark binary but build Spark from the source and publish the module jars into local maven repo And then build the app to make sure the binary is same. It makes no sense to publish Spark module jars into the central maven repo because binary compatibility with a Spark cluster of the same version is not ensured. Is my understanding correct? -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Wednesday, December 17, 2014 8:39 PM To: Sun, Rui Cc: user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:3 5) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j ava:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo?
RE: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary
Not using spark-submit. The App directly communicates with the Spark cluster in standalone mode. If mark the Spark dependency as 'provided’, then the spark-core .jar elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark binary only has an assembly jar, not having individual module jars. So you don’t have a chance to point to a module.jar which is the same binary as that in the pre-built Spark binary. Maybe the Spark distribution should contain not only the assembly jar but also individual module jars. Any opinion? From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu] Sent: Thursday, December 18, 2014 2:20 AM To: Sean Owen Cc: Sun, Rui; user@spark.apache.org Subject: Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary Just to clarify, are you running the application using spark-submit after packaging with sbt package ? One thing that might help is to mark the Spark dependency as 'provided' as then you shouldn't have the Spark classes in your jar. Thanks Shivaram On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.commailto:so...@cloudera.com wrote: You should use the same binaries everywhere. The problem here is that anonymous functions get compiled to different names when you build different (potentially) so you actually have one function being called when another function is meant. On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.commailto:rui@intel.com wrote: Hi, I encountered a weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary. Steps to reproduce: 1. Download the official pre-built Spark binary 1.1.1 at http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz 2. Launch the Spark cluster in pseudo cluster mode 3. A small scala APP which calls RDD.saveAsObjectFile() scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.1 ) val sc = new SparkContext(args(0), test) //args[0] is the Spark master URI val rdd = sc.parallelize(List(1, 2, 3)) rdd.saveAsObjectFile(/tmp/mysaoftmp) sc.stop throws an exception as follows: [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.comhttp://ray-desktop.sh.intel.com): java.lang.ClassCastException: scala.Tuple2 cannot be cast to scala.collection.Iterator [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596) [error] org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) [error] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [error] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) [error] org.apache.spark.scheduler.Task.run(Task.scala:54) [error] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) [error] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) [error] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [error] java.lang.Thread.run(Thread.java:701) After investigation, I found that this is caused by bytecode incompatibility issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built spark assembly respectively. This issue also happens with spark 1.1.0. Is there anything wrong in my usage of Spark? Or anything wrong in the process of deploying Spark module jars to maven repo? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: spark-sql with join terribly slow.
This might be because Spark SQL first does a shuffle on both the tables involved in join on the Join condition as key. I had a specific use case of join where I always Join on specific column id and have an optimisation lined up for that in which i can cache the data partitioned on JOIN key id and could prevent the shuffle by passing the partition information to in-memory caching. See - https://issues.apache.org/jira/browse/SPARK-4849 Thanks -Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-with-join-terribly-slow-tp20751p20756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: When will Spark SQL support building DB index natively?
Thanks, I didn't try the partitioned table support (sounds like a hive feature) Is there any guideline? Should I use hiveContext to create the table with partition firstly? On Thursday, December 18, 2014 2:28 AM, Michael Armbrust mich...@databricks.com wrote: - Dev list Have you looked at partitioned table support? That would only scan data where the predicate matches the partition. Depending on the cardinality of the customerId column that could be a good option for you. On Wed, Dec 17, 2014 at 2:25 AM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, In Spark SQL help document, it says Some of these (such as indexes) are less important due to Spark SQL’s in-memory computational model. Others are slotted for future releases of Spark SQL. - Block level bitmap indexes and virtual columns (used to build indexes) For our use cases, DB index is quite important. I have about 300G data in our database, and we always use customer id as a predicate for DB look up. Without DB index, we will have to scan all 300G data, and it will take 1 minute for a simple DB look up, while MySQL only takes 10 seconds. We tried to create an independent table for each customer id, the result is pretty good, but the logic will be very complex. I'm wondering when will Spark SQL supports DB index, and before that, is there an alternative way to support DB index function? Thanks
Re: Unable to start Spark 1.3 after building:java.lang. NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer
Hi there The following is my steps. And got the same exception with Daniel's. Another question: how can I build a tgz file like the pre-build file I download from official website? 1. download trunk from git. 2. add following lines in pom.xml + profile + idhadoop-2.6/id + properties +hadoop.version2.6.0/hadoop.version +protobuf.version2.5.0/protobuf.version +jets3t.version0.9.0/jets3t.version + /properties +/profile 3. run mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package 4. in $SPARK_HOME, run following command ./bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10 Kyle 2014-12-18 2:24 GMT+08:00 Daniel Haviv danielru...@gmail.com: Thanks for your replies. I was building spark from trunk. Daniel On 17 בדצמ׳ 2014, at 19:49, Nicholas Chammas nicholas.cham...@gmail.com wrote: Thanks for the correction, Sean. Do the docs need to be updated on this point, or is it safer for now just to note 2.4 specifically? On Wed Dec 17 2014 at 5:54:53 AM Sean Owen so...@cloudera.com wrote: Spark works fine with 2.4 *and later*. The docs don't mean to imply 2.4 is the last supported version. On Wed, Dec 17, 2014 at 10:19 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Spark 1.3 does not exist. Spark 1.2 hasn't been released just yet. Which version of Spark did you mean? Also, from what I can see in the docs, I believe the latest version of Hadoop that Spark supports is 2.4, not 2.6. Nick On Wed Dec 17 2014 at 2:09:56 AM Kyle Lin kylelin2...@gmail.com wrote: I also got the same problem.. 2014-12-09 22:58 GMT+08:00 Daniel Haviv danielru...@gmail.com: Hi, I've built spark 1.3 with hadoop 2.6 but when I startup the spark-shell I get the following exception: 14/12/09 06:54:24 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/12/09 06:54:24 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/12/09 06:54:24 INFO ui.SparkUI: Started SparkUI at http://hdname:4040 14/12/09 06:54:25 INFO impl.TimelineClientImpl: Timeline service address: http://0.0.0.0:8188/ws/v1/timeline/ java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoade r.java:142) Any idea why ? Thanks, Daniel
Re: spark streaming kafa best practices ?
Hi, On Thu, Dec 18, 2014 at 3:08 AM, Patrick Wendell pwend...@gmail.com wrote: On Wed, Dec 17, 2014 at 5:43 AM, Gerard Maas gerard.m...@gmail.com wrote: I was wondering why one would choose for rdd.map vs rdd.foreach to execute a side-effecting function on an RDD. Personally, I like to get the count of processed items, so I do something like rdd.map(item = processItem(item)).count() instead of rdd.foreach(item = processItem(item)) but I would be happy to learn about a better way. Tobias
Spark Shell slowness on Google Cloud
All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex
Re: Spark Shell slowness on Google Cloud
I'm curious if you're seeing the same thing when using bdutil against GCS? I'm wondering if this may be an issue concerning the transfer rate of Spark - Hadoop - GCS Connector - GCS. On Wed Dec 17 2014 at 10:09:17 PM Alessandro Baretta alexbare...@gmail.com wrote: All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex
Re: Spark Shell slowness on Google Cloud
Denny, No, gsutil scans through the listing of the bucket quickly. See the following. alex@hadoop-m:~/split$ time bash -c gsutil ls gs://my-bucket/20141205/csv/*/*/* | wc -l 6860 real0m6.971s user0m1.052s sys 0m0.096s Alex On Wed, Dec 17, 2014 at 10:29 PM, Denny Lee denny.g@gmail.com wrote: I'm curious if you're seeing the same thing when using bdutil against GCS? I'm wondering if this may be an issue concerning the transfer rate of Spark - Hadoop - GCS Connector - GCS. On Wed Dec 17 2014 at 10:09:17 PM Alessandro Baretta alexbare...@gmail.com wrote: All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex
Re: Spark Shell slowness on Google Cloud
Oh, it makes sense of gsutil scans through this quickly, but I was wondering if running a Hadoop job / bdutil would result in just as fast scans? On Wed Dec 17 2014 at 10:44:45 PM Alessandro Baretta alexbare...@gmail.com wrote: Denny, No, gsutil scans through the listing of the bucket quickly. See the following. alex@hadoop-m:~/split$ time bash -c gsutil ls gs://my-bucket/20141205/csv/*/*/* | wc -l 6860 real0m6.971s user0m1.052s sys 0m0.096s Alex On Wed, Dec 17, 2014 at 10:29 PM, Denny Lee denny.g@gmail.com wrote: I'm curious if you're seeing the same thing when using bdutil against GCS? I'm wondering if this may be an issue concerning the transfer rate of Spark - Hadoop - GCS Connector - GCS. On Wed Dec 17 2014 at 10:09:17 PM Alessandro Baretta alexbare...@gmail.com wrote: All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex
Getting OutOfMemoryError and Worker.run caught exception
Hi guys, Getting the following errors, 2014-12-17 09:05:02,391 [SocialInteractionDAL.scala:Executor task launch worker-110:20] - --- Inserting into mongo - 2014-12-17 09:05:06,768 [ Logging.scala:Executor task launch worker-110:96] - Exception in task 1.0 in stage 19541.0 (TID 33982) java.lang.OutOfMemoryError: GC overhead limit exceeded at org.bson.io.PoolOutputBuffer.init(PoolOutputBuffer.java:253) at org.bson.BasicBSONDecoder.init(BasicBSONDecoder.java:599) at com.mongodb.DefaultDBDecoder.init(DefaultDBDecoder.java:44) at com.mongodb.DefaultDBDecoder$DefaultFactory.create(DefaultDBDecoder.java:33) at com.mongodb.DBPort.init(DBPort.java:88) at com.mongodb.DBPortFactory.create(DBPortFactory.java:28) at com.mongodb.PooledConnectionProvider$ConnectionItemFactory.create(PooledConnectionProvider.java:186) at com.mongodb.PooledConnectionProvider$ConnectionItemFactory.create(PooledConnectionProvider.java:183) at com.mongodb.ConcurrentPool.createNewAndReleasePermitIfFailure(ConcurrentPool.java:150) at com.mongodb.ConcurrentPool.get(ConcurrentPool.java:118) at com.mongodb.PooledConnectionProvider.get(PooledConnectionProvider.java:75) at com.mongodb.DefaultServer.getConnection(DefaultServer.java:73) at com.mongodb.BaseCluster$WrappedServer.getConnection(BaseCluster.java:219) at com.mongodb.DBTCPConnector$MyPort.getConnection(DBTCPConnector.java:511) at com.mongodb.DBTCPConnector$MyPort.get(DBTCPConnector.java:459) at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:417) at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:182) at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:165) at com.mongodb.DBCollection.insert(DBCollection.java:93) at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:621) at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:1109) at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:606) at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:1109) at com.newscred.analytics.db.mongo.SocialInteractionDAL.insert(SocialInteractionDAL.scala:25) at com.newscred.analytics.streaming.AnalyticsStreamProcessor$$anonfun$process$1.apply(AnalyticsStreamProcessor.scala:16) at com.newscred.analytics.streaming.AnalyticsStreamProcessor$$anonfun$process$1.apply(AnalyticsStreamProcessor.scala:11) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) And, 2014-12-18 01:49:09,770 [AnalyticsStreamProcessor.scala:pool-12-thread-2:10] - Starting processing ... 2014-12-18 01:49:38,050 [ Slf4jLogger.scala:sparkDriver-akka.actor.default-dispatcher-1201:71] - unhandled event Failure(akka.pattern.AskTimeoutException: Timed out) in state WaitTransportShutdown 2014-12-18 01:51:00,576 [ Logging.scala:Spark Context ContextCleaner:96] - Error in cleaning thread java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:136) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) 2014-12-18 01:52:11,688 [ Logging.scala:SparkListenerBus:96] - Uncaught exception in thread SparkListenerBus java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.Semaphore.acquire(Semaphore.java:312) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:48) at
Re: Spark Shell slowness on Google Cloud
Well, what do you suggest I run to test this? But more importantly, what information would this give me? On Wed, Dec 17, 2014 at 10:46 PM, Denny Lee denny.g@gmail.com wrote: Oh, it makes sense of gsutil scans through this quickly, but I was wondering if running a Hadoop job / bdutil would result in just as fast scans? On Wed Dec 17 2014 at 10:44:45 PM Alessandro Baretta alexbare...@gmail.com wrote: Denny, No, gsutil scans through the listing of the bucket quickly. See the following. alex@hadoop-m:~/split$ time bash -c gsutil ls gs://my-bucket/20141205/csv/*/*/* | wc -l 6860 real0m6.971s user0m1.052s sys 0m0.096s Alex On Wed, Dec 17, 2014 at 10:29 PM, Denny Lee denny.g@gmail.com wrote: I'm curious if you're seeing the same thing when using bdutil against GCS? I'm wondering if this may be an issue concerning the transfer rate of Spark - Hadoop - GCS Connector - GCS. On Wed Dec 17 2014 at 10:09:17 PM Alessandro Baretta alexbare...@gmail.com wrote: All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex
Re: Getting OutOfMemoryError and Worker.run caught exception
You can go through this doc for tuning http://spark.apache.org/docs/latest/tuning.html Looks like you are creating a lot of objects and the JVM is spending more time clearing these. If you can paste the code snippet, then it will be easy to understand whats happening. Thanks Best Regards On Thu, Dec 18, 2014 at 12:32 PM, A.K.M. Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: Hi guys, Getting the following errors, 2014-12-17 09:05:02,391 [SocialInteractionDAL.scala:Executor task launch worker-110:20] - --- Inserting into mongo - 2014-12-17 09:05:06,768 [ Logging.scala:Executor task launch worker-110:96] - Exception in task 1.0 in stage 19541.0 (TID 33982) java.lang.OutOfMemoryError: GC overhead limit exceeded at org.bson.io.PoolOutputBuffer.init(PoolOutputBuffer.java:253) at org.bson.BasicBSONDecoder.init(BasicBSONDecoder.java:599) at com.mongodb.DefaultDBDecoder.init(DefaultDBDecoder.java:44) at com.mongodb.DefaultDBDecoder$DefaultFactory.create(DefaultDBDecoder.java:33) at com.mongodb.DBPort.init(DBPort.java:88) at com.mongodb.DBPortFactory.create(DBPortFactory.java:28) at com.mongodb.PooledConnectionProvider$ConnectionItemFactory.create(PooledConnectionProvider.java:186) at com.mongodb.PooledConnectionProvider$ConnectionItemFactory.create(PooledConnectionProvider.java:183) at com.mongodb.ConcurrentPool.createNewAndReleasePermitIfFailure(ConcurrentPool.java:150) at com.mongodb.ConcurrentPool.get(ConcurrentPool.java:118) at com.mongodb.PooledConnectionProvider.get(PooledConnectionProvider.java:75) at com.mongodb.DefaultServer.getConnection(DefaultServer.java:73) at com.mongodb.BaseCluster$WrappedServer.getConnection(BaseCluster.java:219) at com.mongodb.DBTCPConnector$MyPort.getConnection(DBTCPConnector.java:511) at com.mongodb.DBTCPConnector$MyPort.get(DBTCPConnector.java:459) at com.mongodb.DBTCPConnector.getPrimaryPort(DBTCPConnector.java:417) at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:182) at com.mongodb.DBCollectionImpl.insert(DBCollectionImpl.java:165) at com.mongodb.DBCollection.insert(DBCollection.java:93) at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:621) at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:1109) at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:606) at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:1109) at com.newscred.analytics.db.mongo.SocialInteractionDAL.insert(SocialInteractionDAL.scala:25) at com.newscred.analytics.streaming.AnalyticsStreamProcessor$$anonfun$process$1.apply(AnalyticsStreamProcessor.scala:16) at com.newscred.analytics.streaming.AnalyticsStreamProcessor$$anonfun$process$1.apply(AnalyticsStreamProcessor.scala:11) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) And, 2014-12-18 01:49:09,770 [AnalyticsStreamProcessor.scala:pool-12-thread-2:10] - Starting processing ... 2014-12-18 01:49:38,050 [ Slf4jLogger.scala:sparkDriver-akka.actor.default-dispatcher-1201:71] - unhandled event Failure(akka.pattern.AskTimeoutException: Timed out) in state WaitTransportShutdown 2014-12-18 01:51:00,576 [ Logging.scala:Spark Context ContextCleaner:96] - Error in cleaning thread java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:136) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply(ContextCleaner.scala:134) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.ContextCleaner.org $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:133) at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:65) 2014-12-18 01:52:11,688 [ Logging.scala:SparkListenerBus:96] - Uncaught exception in thread SparkListenerBus java.lang.InterruptedException at
Re: Spark Shell slowness on Google Cloud
For Spark to connect to GCS, it utilizes the Hadoop and GCS connector jars for connectivity. I'm wondering if it's those connection points that are ultimately slowing down the connection between Spark and GCS. The reason I was asking if you could run bdutil is because it would be basically Hadoop connecting to GCS. If it's just as slow than that would point to the root cause. That is, it's the Hadoop connection that is slowing things vs something explicitly out of Spark per se. On Wed, Dec 17, 2014 at 23:25 Alessandro Baretta alexbare...@gmail.com wrote: Well, what do you suggest I run to test this? But more importantly, what information would this give me? On Wed, Dec 17, 2014 at 10:46 PM, Denny Lee denny.g@gmail.com wrote: Oh, it makes sense of gsutil scans through this quickly, but I was wondering if running a Hadoop job / bdutil would result in just as fast scans? On Wed Dec 17 2014 at 10:44:45 PM Alessandro Baretta alexbare...@gmail.com wrote: Denny, No, gsutil scans through the listing of the bucket quickly. See the following. alex@hadoop-m:~/split$ time bash -c gsutil ls gs://my-bucket/20141205/csv/*/*/* | wc -l 6860 real0m6.971s user0m1.052s sys 0m0.096s Alex On Wed, Dec 17, 2014 at 10:29 PM, Denny Lee denny.g@gmail.com wrote: I'm curious if you're seeing the same thing when using bdutil against GCS? I'm wondering if this may be an issue concerning the transfer rate of Spark - Hadoop - GCS Connector - GCS. On Wed Dec 17 2014 at 10:09:17 PM Alessandro Baretta alexbare...@gmail.com wrote: All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex
Re: Spark Shell slowness on Google Cloud
Here's another data point: the slow part of my code is the construction of an RDD as the union of the textFile RDDs representing data from several distinct google storage directories. So the question becomes the following: what computation happens when calling the union method on two RDDs? On Wed, Dec 17, 2014 at 11:24 PM, Alessandro Baretta alexbare...@gmail.com wrote: Well, what do you suggest I run to test this? But more importantly, what information would this give me? On Wed, Dec 17, 2014 at 10:46 PM, Denny Lee denny.g@gmail.com wrote: Oh, it makes sense of gsutil scans through this quickly, but I was wondering if running a Hadoop job / bdutil would result in just as fast scans? On Wed Dec 17 2014 at 10:44:45 PM Alessandro Baretta alexbare...@gmail.com wrote: Denny, No, gsutil scans through the listing of the bucket quickly. See the following. alex@hadoop-m:~/split$ time bash -c gsutil ls gs://my-bucket/20141205/csv/*/*/* | wc -l 6860 real0m6.971s user0m1.052s sys 0m0.096s Alex On Wed, Dec 17, 2014 at 10:29 PM, Denny Lee denny.g@gmail.com wrote: I'm curious if you're seeing the same thing when using bdutil against GCS? I'm wondering if this may be an issue concerning the transfer rate of Spark - Hadoop - GCS Connector - GCS. On Wed Dec 17 2014 at 10:09:17 PM Alessandro Baretta alexbare...@gmail.com wrote: All, I'm using the Spark shell to interact with a small test deployment of Spark, built from the current master branch. I'm processing a dataset comprising a few thousand objects on Google Cloud Storage, split into a half dozen directories. My code constructs an object--let me call it the Dataset object--that defines a distinct RDD for each directory. The constructor of the object only defines the RDDs; it does not actually evaluate them, so I would expect it to return very quickly. Indeed, the logging code in the constructor prints a line signaling the completion of the code almost immediately after invocation, but the Spark shell does not show the prompt right away. Instead, it spends a few minutes seemingly frozen, eventually producing the following output: 14/12/18 05:52:49 INFO mapred.FileInputFormat: Total input paths to process : 9 14/12/18 05:54:15 INFO mapred.FileInputFormat: Total input paths to process : 759 14/12/18 05:54:40 INFO mapred.FileInputFormat: Total input paths to process : 228 14/12/18 06:00:11 INFO mapred.FileInputFormat: Total input paths to process : 3076 14/12/18 06:02:02 INFO mapred.FileInputFormat: Total input paths to process : 1013 14/12/18 06:02:21 INFO mapred.FileInputFormat: Total input paths to process : 156 This stage is inexplicably slow. What could be happening? Thanks. Alex