Re: KMeans code is rubbish
The problem is that I get the same results every time On Friday, July 11, 2014 7:22 PM, Ameet Talwalkar atalwal...@gmail.com wrote: Hi Wanda, As Sean mentioned, K-means is not guaranteed to find an optimal answer, even for seemingly simple toy examples. A common heuristic to deal with this issue is to run kmeans multiple times and choose the best answer. You can do this by changing the runs parameter from the default value (1) to something larger (say 10). -Ameet On Fri, Jul 11, 2014 at 1:20 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: I also took a look at spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala and ran the code in a shell. There is an issue here: val initMode = params.initializationMode match { case Random = KMeans.RANDOM case Parallel = KMeans.K_MEANS_PARALLEL } If I use initMode=KMeans.RANDOM everything is ok. If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not know why. The example proposed is a really simple one that should not accept multiple solutions and always converge to the correct one. Now what can be altered in the original SparkKMeans.scala (the seed or something else ?) to get the correct results each and every single time ? On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng men...@gmail.com wrote: SparkKMeans is a naive implementation. Please use mllib.clustering.KMeans in practice. I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your dataset as well, I got the expected answer. And I believe that even though initialization is done using sampling, the example actually sets the seed to a constant 42, so the result should always be the same no matter how many times you run it. So I am not really sure whats going on here. Can you tell us more about which version of Spark you are running? Which Java version? == [tdas @ Xion spark2] cat input 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from SCDynamicStore 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Finished iteration (delta = 3.0) Finished iteration (delta = 0.0) Final centers: DenseVector(5.0, 2.0) DenseVector(2.0, 2.0) On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: so this is what I am running: ./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001 And this is the input file: ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$ └───#!cat ~/Documents/2dim2.txt 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 This is the final output from spark: 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 14 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on localhost (progress: 1/2) 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 15 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on localhost (progress: 2/2) 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at SparkKMeans.scala:75) finished in 0.008 s 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at SparkKMeans.scala:75, took 0.02472681 s Finished iteration (delta = 0.0) Final centers:
Re: mapPartitionsWithIndex
You should return an iterator in mapPartitionsWIthIndex. This is from the programming guide (http://spark.apache.org/docs/latest/programming-guide.html): mapPartitionsWithIndex(func): Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, IteratorT) = IteratorU when running on an RDD of type T. For your case, try something similar to the following: val keyval=dRDD.mapPartitionsWithIndex { (ind,iter) = iter.map(x = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r)) } -Xiangrui On Sun, Jul 13, 2014 at 11:26 PM, Madhura das.madhur...@gmail.com wrote: I have a text file consisting of a large number of random floating values separated by spaces. I am loading this file into a RDD in scala. I have heard of mapPartitionsWithIndex but I haven't been able to implement it. For each partition I want to call a method(process in this case) to which I want to pass the partition and it's respective index as parameters. My method returns a pair of values. This is what I have done. val dRDD = sc.textFile(hdfs://master:54310/Data/input*) var ind:Int=0 val keyval= dRDD.mapPartitionsWithIndex((ind,x) = process(ind,x,...)) val res=keyval.collect() We are not able to access res(0)._1 and res(0)._2 The error log is as follows. [error] SimpleApp.scala:420: value trim is not a member of Iterator[String] [error] Error occurred in an application involving default arguments. [error] val keyval=dRDD.mapPartitionsWithIndex( (ind,x) = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r)) [error] ^ [error] SimpleApp.scala:425: value mkString is not a member of Array[Nothing] [error] println(res.mkString()) [error] ^ [error] /SimpleApp.scala:427: value _1 is not a member of Nothing [error] var final= res(0)._1 [error] ^ [error] /home/madhura/DTWspark/src/main/scala/SimpleApp.scala:428: value _2 is not a member of Nothing [error] var final1 = res(0)._2 - m +1 [error] ^ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Error when testing with large sparse svm
Hi, I encounter an error when testing svm (example one) on very large sparse data. The dataset I ran on was a toy dataset with only ten examples but 13 million sparse vector with a few thousands non-zero entries. The errors is showing below. I am wondering is this a bug or I am missing something? 14/07/13 23:59:44 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/13 23:59:44 INFO SecurityManager: Changing view acls to: chengjie 14/07/13 23:59:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(chengjie) 14/07/13 23:59:45 INFO Slf4jLogger: Slf4jLogger started 14/07/13 23:59:45 INFO Remoting: Starting remoting 14/07/13 23:59:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@master:53173] 14/07/13 23:59:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@master:53173] 14/07/13 23:59:45 INFO SparkEnv: Registering MapOutputTracker 14/07/13 23:59:45 INFO SparkEnv: Registering BlockManagerMaster 14/07/13 23:59:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140713235945-c78f 14/07/13 23:59:45 INFO MemoryStore: MemoryStore started with capacity 14.4 GB. 14/07/13 23:59:45 INFO ConnectionManager: Bound socket to port 37674 with id = ConnectionManagerId(master,37674) 14/07/13 23:59:45 INFO BlockManagerMaster: Trying to register BlockManager 14/07/13 23:59:45 INFO BlockManagerInfo: Registering block manager master:37674 with 14.4 GB RAM 14/07/13 23:59:45 INFO BlockManagerMaster: Registered BlockManager 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server 14/07/13 23:59:45 INFO HttpBroadcast: Broadcast server started at http://10.10.255.128:41838 14/07/13 23:59:45 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ac459d4b-a3c4-4577-bad4-576ac427d0bf 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server 14/07/13 23:59:51 INFO SparkUI: Started SparkUI at http://master:4040 14/07/13 23:59:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/13 23:59:52 INFO EventLoggingListener: Logging events to /tmp/spark-events/binaryclassification-with-params(hdfs---master-9001-splice.small,1,1.0,svm,l1,0.1)-1405317591776 14/07/13 23:59:52 INFO SparkContext: Added JAR file:/home/chengjie/spark-1.0.1/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.3.0.jar at http://10.10.255.128:54689/jars/spark-examples-1.0.1-hadoop2.3.0.jar with timestamp 1405317592653 14/07/13 23:59:52 INFO AppClient$ClientActor: Connecting to master spark://master:7077... 14/07/14 00:00:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Training: 10 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS *Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 20:0 was 94453098 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values.* at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at
Re: Catalyst dependency on Spark Core
Make Catalyst independent of Spark is the goal of Catalyst, maybe need time and evolution. I awared that package org.apache.spark.sql.catalyst.util embraced org.apache.spark.util.{Utils = SparkUtils}, so that Catalyst has a dependency on Spark core. I'm not sure whether it will be replaced by other component independent of Spark in later release. 2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: As per the recent presentation given in Scala days ( http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it was mentioned that Catalyst is independent of Spark. But on inspecting pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core. Any particular reason for the dependency? I would love to use Catalyst outside Spark (reposted as previous email bounced. Sorry if this is a duplicate).
Re: Supported SQL syntax in Spark SQL
I am very interested in the original question as well, is there any list (even if it is simply in the code) of all supported syntax for Spark SQL? On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Are you sure the code running on the cluster has been updated? I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m assuming that’s taken care of, at least in theory. I just spun down the clusters I had up, but I will revisit this tomorrow and provide the information you requested. Nick -- Mvh. Martin Gammelsæter 92209139
Re: Error when testing with large sparse svm
You need to set a larger `spark.akka.frameSize`, e.g., 128, for the serialized weight vector. There is a JIRA about switching automatically between sending through akka or broadcast: https://issues.apache.org/jira/browse/SPARK-2361 . -Xiangrui On Mon, Jul 14, 2014 at 12:15 AM, crater cq...@ucmerced.edu wrote: Hi, I encounter an error when testing svm (example one) on very large sparse data. The dataset I ran on was a toy dataset with only ten examples but 13 million sparse vector with a few thousands non-zero entries. The errors is showing below. I am wondering is this a bug or I am missing something? 14/07/13 23:59:44 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/13 23:59:44 INFO SecurityManager: Changing view acls to: chengjie 14/07/13 23:59:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(chengjie) 14/07/13 23:59:45 INFO Slf4jLogger: Slf4jLogger started 14/07/13 23:59:45 INFO Remoting: Starting remoting 14/07/13 23:59:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@master:53173] 14/07/13 23:59:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@master:53173] 14/07/13 23:59:45 INFO SparkEnv: Registering MapOutputTracker 14/07/13 23:59:45 INFO SparkEnv: Registering BlockManagerMaster 14/07/13 23:59:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140713235945-c78f 14/07/13 23:59:45 INFO MemoryStore: MemoryStore started with capacity 14.4 GB. 14/07/13 23:59:45 INFO ConnectionManager: Bound socket to port 37674 with id = ConnectionManagerId(master,37674) 14/07/13 23:59:45 INFO BlockManagerMaster: Trying to register BlockManager 14/07/13 23:59:45 INFO BlockManagerInfo: Registering block manager master:37674 with 14.4 GB RAM 14/07/13 23:59:45 INFO BlockManagerMaster: Registered BlockManager 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server 14/07/13 23:59:45 INFO HttpBroadcast: Broadcast server started at http://10.10.255.128:41838 14/07/13 23:59:45 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ac459d4b-a3c4-4577-bad4-576ac427d0bf 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server 14/07/13 23:59:51 INFO SparkUI: Started SparkUI at http://master:4040 14/07/13 23:59:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/13 23:59:52 INFO EventLoggingListener: Logging events to /tmp/spark-events/binaryclassification-with-params(hdfs---master-9001-splice.small,1,1.0,svm,l1,0.1)-1405317591776 14/07/13 23:59:52 INFO SparkContext: Added JAR file:/home/chengjie/spark-1.0.1/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.3.0.jar at http://10.10.255.128:54689/jars/spark-examples-1.0.1-hadoop2.3.0.jar with timestamp 1405317592653 14/07/13 23:59:52 INFO AppClient$ClientActor: Connecting to master spark://master:7077... 14/07/14 00:00:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Training: 10 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS *Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 20:0 was 94453098 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values.* at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at
Re: Graphx traversal and merge interesting edges
Hi Ankur, FYI - in a naive attempt to enhance your solution, managed to create MergePatternPath. I think it works in expected way (atleast for the traversing problem in last email). I modified your code a bit. Also instead of EdgePattern I used List of Functions that match the whole edge triplets along the path... and it returns a *new Graph* which preserves the vertices attributes, but only with new merged edges. MergePatternPath: https://github.com/hihellobolke/spark/blob/graphx-traversal/graphx/src/main/scala/org/apache/spark/graphx/lib/MergePatternPath.scala Here's a Gist of how I was using it: https://gist.github.com/hihellobolke/c8e6c97cefed714258ad This prolly is very naive attempt :-). Is there any possibility of adding it to the graphx.lib albeit one which is sophisticated performant? Thanks On 08-Jul-2014, at 4:57 pm, HHB hihellobo...@gmail.com wrote: Hi Ankur, I was trying out the PatterMatcher it works for smaller path, but I see that for the longer ones it continues to run forever... Here's what I am trying: https://gist.github.com/hihellobolke/dd2dc0fcebba485975d1 (The example of 3 share traders transacting in appl shares) The first edge pattern list (Line 66) works okay, but the second one (Line 76) never return.. Thanks, Gautam On 05-Jul-2014, at 3:23 pm, Ankur Dave ankurd...@gmail.com wrote: Interesting problem! My understanding is that you want to (1) find paths matching a particular pattern, and (2) add edges between the start and end vertices of the matched paths. For (1), I implemented a pattern matcher for GraphX that iteratively accumulates partial pattern matches. I used your example in the unit test. For (2), you can take the output of the pattern matcher (the set of matching paths organized by their terminal vertices) and construct a set of new edges using the initial and terminal vertices of each path. Then you can make a new graph consisting of the union of the original edge set and the new edges. Let me know if you'd like help with this. Ankur
Re: mapPartitionsWithIndex
It worked! I was struggling for a week. Thanks a lot! On Mon, Jul 14, 2014 at 12:31 PM, Xiangrui Meng [via Apache Spark User List] ml-node+s1001560n9591...@n3.nabble.com wrote: You should return an iterator in mapPartitionsWIthIndex. This is from the programming guide (http://spark.apache.org/docs/latest/programming-guide.html): mapPartitionsWithIndex(func): Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, IteratorT) = IteratorU when running on an RDD of type T. For your case, try something similar to the following: val keyval=dRDD.mapPartitionsWithIndex { (ind,iter) = iter.map(x = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r)) } -Xiangrui On Sun, Jul 13, 2014 at 11:26 PM, Madhura [hidden email] http://user/SendEmail.jtp?type=nodenode=9591i=0 wrote: I have a text file consisting of a large number of random floating values separated by spaces. I am loading this file into a RDD in scala. I have heard of mapPartitionsWithIndex but I haven't been able to implement it. For each partition I want to call a method(process in this case) to which I want to pass the partition and it's respective index as parameters. My method returns a pair of values. This is what I have done. val dRDD = sc.textFile(hdfs://master:54310/Data/input*) var ind:Int=0 val keyval= dRDD.mapPartitionsWithIndex((ind,x) = process(ind,x,...)) val res=keyval.collect() We are not able to access res(0)._1 and res(0)._2 The error log is as follows. [error] SimpleApp.scala:420: value trim is not a member of Iterator[String] [error] Error occurred in an application involving default arguments. [error] val keyval=dRDD.mapPartitionsWithIndex( (ind,x) = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r)) [error] ^ [error] SimpleApp.scala:425: value mkString is not a member of Array[Nothing] [error] println(res.mkString()) [error] ^ [error] /SimpleApp.scala:427: value _1 is not a member of Nothing [error] var final= res(0)._1 [error] ^ [error] /home/madhura/DTWspark/src/main/scala/SimpleApp.scala:428: value _2 is not a member of Nothing [error] var final1 = res(0)._2 - m +1 [error] ^ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590p9591.html To unsubscribe from mapPartitionsWithIndex, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=9590code=ZGFzLm1hZGh1cmE5NEBnbWFpbC5jb218OTU5MHwtMTcyNjUwNDQ1Mg== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590p9598.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark1.0.1 catalyst transform filter not push down
Hi, I encountered a weird problem in spark sql. I use sbt/sbt hive/console to go into the shell. I test the filter push down by using catalyst. scala val queryPlan = sql(select value from (select key,value from src)a where a.key=86 ) scala queryPlan.baseLogicalPlan res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project ['value] Filter ('a.key = 86) Subquery a Project ['key,'value] UnresolvedRelation None, src, None I want to achieve the Filter Push Down. So I run : scala var newQuery = queryPlan.baseLogicalPlan transform { | case f @ Filter(_, p @ Project(_,grandChild)) | if (f.references subsetOf grandChild.output) = | p.copy(child = f.copy(child = grandChild)) | } console:42: error: type mismatch; found : Seq[org.apache.spark.sql.catalyst.expressions.Attribute] required: scala.collection.GenSet[org.apache.spark.sql.catalyst.expressions.Attribute] if (f.references subsetOf grandChild.output) = ^ It throws exception above. I don't know what's wrong. If I run : var newQuery = queryPlan.baseLogicalPlan transform { case f @ Filter(_, p @ Project(_,grandChild)) if true = p.copy(child = f.copy(child = grandChild)) } scala var newQuery = queryPlan.baseLogicalPlan transform { | case f @ Filter(_, p @ Project(_,grandChild)) | if true = | p.copy(child = f.copy(child = grandChild)) | } newQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project ['value] Filter ('a.key = 86) Subquery a Project ['key,'value] UnresolvedRelation None, src, None It seems the Filter also in the same position, not switch the order. Can anyone guide me about it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-catalyst-transform-filter-not-push-down-tp9599.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
sbt + idea + test
Hi guys, I want to use Elasticsearch and HBase in my spark project, I want to create a test. I pulled up ES and Zookeeper, but if I put val htest = new HBaseTestingUtility() to my app I got a strange exception (compilation time, not runtime). https://gist.github.com/b0c1/4a4b3f6350816090c3b5 Any idea? -- Skype: boci13, Hangout: boci.b...@gmail.com
Running Spark on Microsoft Azure HDInsight
Hi everyone, Currently I am working on parallelizing a machine learning algorithm using a Microsoft HDInsight cluster. I tried running my algorithm on Hadoop MapReduce, but since my algorithm is iterative the job scheduling overhead and data loading overhead severely limits the performance of my algorithm in terms of training time. Since recently, HDInsight supports Hadoop 2 with YARN, which I thought would allow me to use run Spark jobs, which seem more fitting for my task. So far I have not been able however to find how I can run Apache Spark jobs on a HDInsight cluster. It seems like remote job submission (which would have my preference) is not possible for Spark on HDInsight, as REST endpoints for Oozie and templeton do not seem to support submission of Spark jobs. I also tried to RDP to the headnode for job submission from the headnode. On the headnode drives I can find other new YARN computation models like Tez and I also managed to run Tez jobs on it through YARN. However, Spark seems to be missing. Does this mean that HDInsight currently does not support Spark, even though it supports Hadoop versions with YARN? Or do I need to install Spark on the HDInsight cluster first, in some way? Or is there maybe something else that I'm missing and can I run Spark jobs on HDInsight some other way? Many thanks in advance! Kind regards, Niek Tax
Spark SQL 1.0.1 error on reading fixed length byte array
Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array supposed to work in this version? I noticed that other array types like int or string already work. Thanks, -- Pei-Lun
Error in spark: Exception in thread delete Spark temp dir
I am getting an error saying: Exception in thread delete Spark temp dir C:\Users\shawn\AppData\Local\Temp\spark-b4f1105c-d67b-488c-83f9-eff1d1b95786 java.io.IOExcept ion: Failed to delete: C:\Users\shawn\AppData\Local\Temp\spark-b4f1105c-d67b-488c-83f9-eff1d1b95786\tmppr36zu at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:483) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:479) at org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:478) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:478) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:212) Can anyone help me out of it? If the logs are required then I can forward them? -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Problem reading in LZO compressed files
Nicholas, thanks nevertheless! I am going to spend some time to try and figure this out and report back :-) Ognen On 7/13/14, 7:05 PM, Nicholas Chammas wrote: I actually never got this to work, which is part of the reason why I filed that JIRA. Apart from using |--jar| when starting the shell, I don’t have any more pointers for you. :( On Sun, Jul 13, 2014 at 12:57 PM, Ognen Duzlevski ognen.duzlev...@gmail.com mailto:ognen.duzlev...@gmail.com wrote: Nicholas, Thanks! How do I make spark assemble against a local version of Hadoop? I have 2.4.1 running on a test cluster and I did SPARK_HADOOP_VERSION=2.4.1 sbt/sbt assembly but all it did was pull in hadoop-2.4.1 dependencies via sbt (which is sufficient for using a 2.4.1 HDFS). I am guessing my local version of Hadoop libraries/jars is not used. Alternatively, how do I add the hadoop-gpl-compression-0.1.0.jar (responsible for the lzo stuff) to this hand assembled Spark? I am running the spark-shell like this: bin/spark-shell --jars /home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar and getting this: scala val f = sc.newAPIHadoopFile(hdfs://10.10.0.98:54310/data/1gram.lzo http://10.10.0.98:54310/data/1gram.lzo,classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]) 14/07/13 16:53:01 INFO MemoryStore: ensureFreeSpace(216014) called with curMem=0, maxMem=311387750 14/07/13 16:53:01 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 211.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at console:12 scala f.take(1) 14/07/13 16:53:08 INFO FileInputFormat: Total input paths to process : 1 java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at com.hadoop.mapreduce.LzoTextInputFormat.listStatus(LzoTextInputFormat.java:67) which makes me think something is not linked to something properly (not a Java expert unfortunately). Thanks! Ognen On 7/13/14, 10:35 AM, Nicholas Chammas wrote: If you’re still seeing gibberish, it’s because Spark is not using the LZO libraries properly. In your case, I believe you should be calling |newAPIHadoopFile()| instead of |textFile()|. For example: |sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data, classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]) | On a side note, here’s a related JIRA issue: SPARK-2394: Make it easier to read LZO-compressed files from EC2 clusters https://issues.apache.org/jira/browse/SPARK-2394 Nick On Sun, Jul 13, 2014 at 10:49 AM, Ognen Duzlevski ognen.duzlev...@gmail.com mailto:ognen.duzlev...@gmail.com wrote: Hello, I have been trying to play with the Google ngram dataset provided by Amazon in form of LZO compressed files. I am having trouble understanding what is going on ;). I have added the compression jar and native library to the underlying Hadoop/HDFS installation, restarted the name node and the datanodes, Spark can obviously see the file but I get gibberish on a read. Any ideas? See output below: 14/07/13 14:39:19 INFO SparkContext: Added JAR file:/home/ec2-user/hadoop/lib/hadoop-gpl-compression-0.1.0.jar at http://10.10.0.100:40100/jars/hadoop-gpl-compression-0.1.0.jar with timestamp 1405262359777 14/07/13 14:39:20 INFO SparkILoop: Created spark context.. Spark context available as sc. scala val f = sc.textFile(hdfs://10.10.0.98:54310/data/1gram.lzo http://10.10.0.98:54310/data/1gram.lzo) 14/07/13 14:39:34 INFO MemoryStore: ensureFreeSpace(163793) called with curMem=0, maxMem=311387750 14/07/13 14:39:34 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 160.0 KB, free 296.8 MB) f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala f.take(10) 14/07/13 14:39:43 INFO SparkContext: Job finished: take at console:15, took 0.419708348 s res0: Array[String] = Array(SEQ?!org.apache.hadoop.io.LongWritable?org.apache.hadoop.io.Text??#com.hadoop.compression.lzo.LzoCodec���\N�#^�??d^�k���\N�#^�??d^�k��3��??�3???�?? ?�?�?�m??��??hx??�??�???�??�??�??�??�??�? �?, �? �? �?,
Can we get a spark context inside a mapper
Hey, My question is for this situation: Suppose we have 10 files each containing list of features in each row. Task is that for each file cluster the features in that file and write the corresponding cluster along with it in a new file. So we have to generate 10 more files by applying clustering in each file individually. So can I do it this way, that get rdd of list of files and apply map. Inside the mapper function which will be handling each file, get another spark context and use Mllib kmeans to get the clustered output file. Please suggest the appropriate method to tackle this problem. Thanks, Rahul Kumar Bhojwani 3rd year, B.Tech Computer Science Engineering National Institute Of Technology, Karnataka 9945197359
Re: Spark Questions
Thanks for your answers Shuo Xiang and Aaron Davidson! Regards, -- *Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data Specialist | *GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext. 15494 | [image: Facebook] https://www.facebook.com/Globant [image: Twitter] http://www.twitter.com/globant [image: Youtube] http://www.youtube.com/Globant [image: Linkedin] http://www.linkedin.com/company/globant [image: Pinterest] http://pinterest.com/globant/ [image: Globant] http://www.globant.com/ On Sat, Jul 12, 2014 at 9:02 PM, Aaron Davidson ilike...@gmail.com wrote: I am not entirely certain I understand your questions, but let me assume you are mostly interested in SparkSQL and are thinking about your problem in terms of SQL-like tables. 1. Shuo Xiang mentioned Spark partitioning strategies, but in case you are talking about data partitioning or sharding as exist in Hive, SparkSQL does not currently support this, though it is on the roadmap. We can read from partitioned Hive tables, however. 2. If by entries/record you mean something like columns/row, SparkSQL does allow you to project out the columns you want, or select all columns. The efficiency of such a projection is determined by the how the data is stored, however: If your data is stored in an inherently row-based format, this projection will be no faster than doing an initial map() over the data to only select the desired columns. If it's stored in something like Parquet, or cached in memory, however, we would avoid ever looking at the unused columns. 3. Spark has a very generalized data source API, so it is capable of interacting with whatever data source. However, I don't think we currently have any SparkSQL connectors to RDBMSes that would support column pruning or other push-downs. This is all very much viable, however. On Fri, Jul 11, 2014 at 1:35 PM, Gonzalo Zarza gonzalo.za...@globant.com wrote: Hi all, We've been evaluating Spark for a long-term project. Although we've been reading several topics in forum, any hints on the following topics we'll be extremely welcomed: 1. Which are the data partition strategies available in Spark? How configurable are these strategies? 2. How would be the best way to use Spark if queries can touch only 3-5 entries/records? Which strategy is the best if they want to perform a full scan of the entries? 3. Is Spark capable of interacting with RDBMS? Thanks a lot! Best regards, -- *Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data Specialist | *GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext. 15494 | [image: Facebook] https://www.facebook.com/Globant [image: Twitter] http://www.twitter.com/globant [image: Youtube] http://www.youtube.com/Globant [image: Linkedin] http://www.linkedin.com/company/globant [image: Pinterest] http://pinterest.com/globant/ [image: Globant] http://www.globant.com/
Re: spark1.0.1 catalyst transform filter not push down
Hi, queryPlan.baseLogicalPlan is not the plan used to execution. Actually, the baseLogicalPlan of a SchemaRDD (queryPlan in your case) is just the parsed plan (the parsed plan will be analyzed, and then optimized. Finally, a physical plan will be created). The plan shows up after you execute val queryPlan = sql(select value from (select key,value from src)a where a.key=86 ) is the physical plan. Or, you can use queryPlan.queryExecution to see the Logical Plan, Optimized Logical Plan, and Physical Plan. You can find the physical plan is == Physical Plan == Project [value#3:0] Filter (key#2:1 = 86) HiveTableScan [value#3,key#2], (MetastoreRelation default, src, None), None Thanks, Yin On Mon, Jul 14, 2014 at 3:42 AM, victor sheng victorsheng...@gmail.com wrote: Hi, I encountered a weird problem in spark sql. I use sbt/sbt hive/console to go into the shell. I test the filter push down by using catalyst. scala val queryPlan = sql(select value from (select key,value from src)a where a.key=86 ) scala queryPlan.baseLogicalPlan res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project ['value] Filter ('a.key = 86) Subquery a Project ['key,'value] UnresolvedRelation None, src, None I want to achieve the Filter Push Down. So I run : scala var newQuery = queryPlan.baseLogicalPlan transform { | case f @ Filter(_, p @ Project(_,grandChild)) | if (f.references subsetOf grandChild.output) = | p.copy(child = f.copy(child = grandChild)) | } console:42: error: type mismatch; found : Seq[org.apache.spark.sql.catalyst.expressions.Attribute] required: scala.collection.GenSet[org.apache.spark.sql.catalyst.expressions.Attribute] if (f.references subsetOf grandChild.output) = ^ It throws exception above. I don't know what's wrong. If I run : var newQuery = queryPlan.baseLogicalPlan transform { case f @ Filter(_, p @ Project(_,grandChild)) if true = p.copy(child = f.copy(child = grandChild)) } scala var newQuery = queryPlan.baseLogicalPlan transform { | case f @ Filter(_, p @ Project(_,grandChild)) | if true = | p.copy(child = f.copy(child = grandChild)) | } newQuery: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project ['value] Filter ('a.key = 86) Subquery a Project ['key,'value] UnresolvedRelation None, src, None It seems the Filter also in the same position, not switch the order. Can anyone guide me about it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-catalyst-transform-filter-not-push-down-tp9599.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running Spark on Microsoft Azure HDInsight
I'm a Spark and HDInsight novice, so I could be wrong... HDInsight is based on HDP2, so my guess here is that you have the option of installing/configuring Spark in cluster mode (YARN) or in standalone mode and package the Spark binaries with your job. Everything I seem to look at is related to UNIX shell scripts. So, one might need to pull apart some of these scripts to pick out how to run this on Windows. Interesting project... Marco On Mon, Jul 14, 2014 at 8:00 AM, Niek Tax niek...@gmail.com wrote: Hi everyone, Currently I am working on parallelizing a machine learning algorithm using a Microsoft HDInsight cluster. I tried running my algorithm on Hadoop MapReduce, but since my algorithm is iterative the job scheduling overhead and data loading overhead severely limits the performance of my algorithm in terms of training time. Since recently, HDInsight supports Hadoop 2 with YARN, which I thought would allow me to use run Spark jobs, which seem more fitting for my task. So far I have not been able however to find how I can run Apache Spark jobs on a HDInsight cluster. It seems like remote job submission (which would have my preference) is not possible for Spark on HDInsight, as REST endpoints for Oozie and templeton do not seem to support submission of Spark jobs. I also tried to RDP to the headnode for job submission from the headnode. On the headnode drives I can find other new YARN computation models like Tez and I also managed to run Tez jobs on it through YARN. However, Spark seems to be missing. Does this mean that HDInsight currently does not support Spark, even though it supports Hadoop versions with YARN? Or do I need to install Spark on the HDInsight cluster first, in some way? Or is there maybe something else that I'm missing and can I run Spark jobs on HDInsight some other way? Many thanks in advance! Kind regards, Niek Tax
Re: Announcing Spark 1.0.1
Hi Patrick, This is great news but I nearly missed the announcement because it had scrolled off the folder view that I have Spark users list messages go to. 40+ new threads since you sent the email out on Friday evening. You might consider having someone on your team create a spark-announcement list so that it is easier to disseminate important information like this release announcement. Thanks again for all your hard work. I know you and the rest of the team are getting a million requests a day Philip On 07/11/2014 07:35 PM, Patrick Wendell wrote: I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
Re: Running Spark on Microsoft Azure HDInsight
Looks like going with cluster mode is not a good idea: http://azure.microsoft.com/en-us/documentation/articles/hdinsight-administer-use-management-portal/ Seems like a non-HDInsight VM might be needed to make it the Spark master node. Marco On Mon, Jul 14, 2014 at 12:43 PM, Marco Shaw marco.s...@gmail.com wrote: I'm a Spark and HDInsight novice, so I could be wrong... HDInsight is based on HDP2, so my guess here is that you have the option of installing/configuring Spark in cluster mode (YARN) or in standalone mode and package the Spark binaries with your job. Everything I seem to look at is related to UNIX shell scripts. So, one might need to pull apart some of these scripts to pick out how to run this on Windows. Interesting project... Marco On Mon, Jul 14, 2014 at 8:00 AM, Niek Tax niek...@gmail.com wrote: Hi everyone, Currently I am working on parallelizing a machine learning algorithm using a Microsoft HDInsight cluster. I tried running my algorithm on Hadoop MapReduce, but since my algorithm is iterative the job scheduling overhead and data loading overhead severely limits the performance of my algorithm in terms of training time. Since recently, HDInsight supports Hadoop 2 with YARN, which I thought would allow me to use run Spark jobs, which seem more fitting for my task. So far I have not been able however to find how I can run Apache Spark jobs on a HDInsight cluster. It seems like remote job submission (which would have my preference) is not possible for Spark on HDInsight, as REST endpoints for Oozie and templeton do not seem to support submission of Spark jobs. I also tried to RDP to the headnode for job submission from the headnode. On the headnode drives I can find other new YARN computation models like Tez and I also managed to run Tez jobs on it through YARN. However, Spark seems to be missing. Does this mean that HDInsight currently does not support Spark, even though it supports Hadoop versions with YARN? Or do I need to install Spark on the HDInsight cluster first, in some way? Or is there maybe something else that I'm missing and can I run Spark jobs on HDInsight some other way? Many thanks in advance! Kind regards, Niek Tax
RE: writing FLume data to HDFS
I am not sure how to write it…I tried writing to local file system using FileWriter and Print Writer. I tried it inside the while loop. I am able to get the text and able to print it but it fails when I use regular java classes. Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I have to create the file in HDFS using HDFS classes? I thought of using Spark’s SaveAsTextFile(). But I have JavaRDDSparkFlumeEvent of this..not JavaRDDAvroEvent. So I am not sure whether SaveAsText() will work. I appreciate any guidance here. How do I get more code examples? Books, URL? flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); ??I was trying to write the data to hdfs..but it fails… From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Friday, July 11, 2014 1:43 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: writing FLume data to HDFS What is the error you are getting when you say ??I was trying to write the data to hdfs..but it fails… TD On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. muthu.x.sundaram@sabre.commailto:muthu.x.sundaram@sabre.com wrote: I am new to spark. I am trying to do the following. Netcat--Flume--Spark streaming(process Flume Data)--HDFS. My flume config file has following set up. Source = netcat Sink=avrosink. Spark Streaming code: I am able to print data from flume to the monitor. But I am struggling to create a file. In order to get the real data I need to convert SparkEvent to avroEvent. JavaRDD.saveAsText()--might not work..because JavaRDD is collection of SparkEvent..Do I need to convert this in to collection of JavaRDDAvroEvent? Please share any code examples… Thanks. Code: Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName(JavaFlumeEventCount); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); flumeStream.foreachRDD(new Function2JavaRDDSparkFlumeEvent,JavaRDDSparkFlumeEvent,Void(){ @Override public Void call(JavaRDDSparkFlumeEvent events1,JavaRDDSparkFlumeEvent events2) throws Exception{ events1.saveasTextFile(output.txt); return null; } }); /*flumeStream.count().map(new FunctionLong, String() { @Override public String call(Long in) { return Received + in + flume events.; } }).print();*/ flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); ??I was trying to write the data to hdfs..but it fails… } System.out.println(Processed this batch in: + (System.currentTimeMillis() - t1)/1000 + seconds); return null; } });
Re: Error when testing with large sparse svm
Hi xiangrui, Where can I set the spark.akka.frameSize ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9616.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error when testing with large sparse svm
If you use Scala, you can do: val conf = new SparkConf() .setMaster(yarn-client) .setAppName(Logistic regression SGD fixed) .set(spark.akka.frameSize, 100) .setExecutorEnv(SPARK_JAVA_OPTS, -Dspark.akka.frameSize=100) var sc = new SparkContext(conf) I have been struggling with this too. I was trying to run Spark on the KDDB website which has about 29M features. It implodes and dies. Let me know if you are able to figure out how to get things to work well on really really wide datasets. Regards, Krishna On Mon, Jul 14, 2014 at 10:18 AM, crater cq...@ucmerced.edu wrote: Hi xiangrui, Where can I set the spark.akka.frameSize ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9616.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming Json file groupby function
hi I am new to spark and scala and I am trying to do some aggregations on json file stream using Spark Streaming. I am able to parse the json string and it is converted to map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) now i want to use GROUPBY function on each student map data and wanted to do some aggregations on scores. Here is my main function val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) // ssc.checkpoint(checkpoint) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonf.print() ssc.start() ssc.awaitTermination() } Can anyone please Let me know how to use groupby function..thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Trouble with spark-ec2 script: --ebs-vol-size
Hello, I'm using the spark-0.9.1-bin-hadoop1 distribution, and the ec2/spark-ec2 script within it to spin up a cluster. I tried running my processing just using the default (ephemeral) HDFS configuration, but my job errored out, saying that there was no space left. So now I'm trying to increase the size of HDFS on the cluster. My launch command: ec2/spark-ec2 -k ... -i ... -z us-east-1d -s 4 -t m3.2xlarge --ebs-vol-size=250 -m r3.2xlarge launch ... My understanding is that I should get a cluster, where each slave node has an ebs backed drive with 250 GB of storage, with a persistent HDFS set to use these slave drives. I turn off the ephemeral HDFS on the cluster master: ephemeral-hdfs/bin/stop-all.sh Then I turn on the persistent HDFS on the cluster master: persistent-hdfs/bin/start-all.sh Once I discovered the proper URL to hit the persistent name node page (not the ephemeral standard 50070 port): http://master:60070/dfshealth.jsp The page shows 4 nodes as expected, but the configured capacity shows as 31.5 GB, not the expected 1 TB (250 GB x 4) Please help! Don't be shy to let me know if I've made mis-steps, or if I'm not understanding things correctly! Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trouble-with-spark-ec2-script-ebs-vol-size-tp9619.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Supported SQL syntax in Spark SQL
You can find the parser here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala In general the hive parser provided by HQL is much more complete at the moment. Long term we will likely stop using parser combinators and either write a more complete parser, or adopt one from an existing project. On Mon, Jul 14, 2014 at 12:25 AM, Martin Gammelsæter martingammelsae...@gmail.com wrote: I am very interested in the original question as well, is there any list (even if it is simply in the code) of all supported syntax for Spark SQL? On Mon, Jul 14, 2014 at 6:41 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Are you sure the code running on the cluster has been updated? I launched the cluster using spark-ec2 from the 1.0.1 release, so I’m assuming that’s taken care of, at least in theory. I just spun down the clusters I had up, but I will revisit this tomorrow and provide the information you requested. Nick -- Mvh. Martin Gammelsæter 92209139
Gradient Boosted Machines
Hi, My company is strongly considering implementing a recommendation engine that is built off of statistical models using Spark. We attended the Spark Summit and were incredibly impressed with the technology and the entire community. Since then, we have been exploring the technology and determining how we could use it for our specific needs. One algorithm that we ideally want to use as part of our project is Gradient Boosted Machines. We are aware that they have not yet been implemented in MLib and would like to submit our request that they be considered for future implementation. Additionally, we would love to see the AdaBoost algorithm implemented in Mlib and Feature Preprocessing implemented in Python (as it already exists for Scala). Otherwise, thank you for taking our feedback and for providing us with this incredible technology. Daniel
Re: Can we get a spark context inside a mapper
I understand that the question is very unprofessional, but I am a newbie. If you could share some link where I can ask such questions, if not here. But please answer. On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hey, My question is for this situation: Suppose we have 10 files each containing list of features in each row. Task is that for each file cluster the features in that file and write the corresponding cluster along with it in a new file. So we have to generate 10 more files by applying clustering in each file individually. So can I do it this way, that get rdd of list of files and apply map. Inside the mapper function which will be handling each file, get another spark context and use Mllib kmeans to get the clustered output file. Please suggest the appropriate method to tackle this problem. Thanks, Rahul Kumar Bhojwani 3rd year, B.Tech Computer Science Engineering National Institute Of Technology, Karnataka 9945197359 -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Error when testing with large sparse svm
Hi Krishna, Thanks for your help. Are you able to get your 29M data running yet? I fix the previous problem by setting larger spark.akka.frameSize, but now I get some other errors below. Did you get these errors before? 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote Akka client disassociated 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0) 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote Akka client disassociated 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1) 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote Akka client disassociated 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0) 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote Akka client disassociated 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1) 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote Akka client disassociated 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote Akka client disassociated 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on host node6 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can we get a spark context inside a mapper
Rahul, I'm not sure what you mean by your question being very unprofessional. You can feel free to answer such questions here. You may or may not receive an answer, and you shouldn't necessarily expect to have your question answered within five hours. I've never tried to do anything like your case. I imagine the easiest thing would be to read and process each file individually, since you are intending to produce a separate result for each. You could also look at RDD.wholeTextFiles - maybe that will be of some use if your files are small - but I don't know of any corresponding save method which would generate files with different names from within a single RDD. On Mon, Jul 14, 2014 at 2:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: I understand that the question is very unprofessional, but I am a newbie. If you could share some link where I can ask such questions, if not here. But please answer. On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hey, My question is for this situation: Suppose we have 10 files each containing list of features in each row. Task is that for each file cluster the features in that file and write the corresponding cluster along with it in a new file. So we have to generate 10 more files by applying clustering in each file individually. So can I do it this way, that get rdd of list of files and apply map. Inside the mapper function which will be handling each file, get another spark context and use Mllib kmeans to get the clustered output file. Please suggest the appropriate method to tackle this problem. Thanks, Rahul Kumar Bhojwani 3rd year, B.Tech Computer Science Engineering National Institute Of Technology, Karnataka 9945197359 -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Can we get a spark context inside a mapper
You currently can't use SparkContext inside a Spark task, so in this case you'd have to call some kind of local K-means library. One example you can try to use is Weka (http://www.cs.waikato.ac.nz/ml/weka/). You can then load your text files as an RDD of strings with SparkContext.wholeTextFiles and call Weka on each one. Matei On Jul 14, 2014, at 11:30 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: I understand that the question is very unprofessional, but I am a newbie. If you could share some link where I can ask such questions, if not here. But please answer. On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hey, My question is for this situation: Suppose we have 10 files each containing list of features in each row. Task is that for each file cluster the features in that file and write the corresponding cluster along with it in a new file. So we have to generate 10 more files by applying clustering in each file individually. So can I do it this way, that get rdd of list of files and apply map. Inside the mapper function which will be handling each file, get another spark context and use Mllib kmeans to get the clustered output file. Please suggest the appropriate method to tackle this problem. Thanks, Rahul Kumar Bhojwani 3rd year, B.Tech Computer Science Engineering National Institute Of Technology, Karnataka 9945197359 -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Error when testing with large sparse svm
That is exactly the same error that I got. I am still having no success. Regards, Krishna On Mon, Jul 14, 2014 at 11:50 AM, crater cq...@ucmerced.edu wrote: Hi Krishna, Thanks for your help. Are you able to get your 29M data running yet? I fix the previous problem by setting larger spark.akka.frameSize, but now I get some other errors below. Did you get these errors before? 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote Akka client disassociated 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0) 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote Akka client disassociated 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1) 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote Akka client disassociated 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0) 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote Akka client disassociated 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1) 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote Akka client disassociated 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote Akka client disassociated 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on host node6 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can we get a spark context inside a mapper
Hi there, I think the question is interesting; a spark of sparks = spark I wonder if you can use the spark job server ( https://github.com/ooyala/spark-jobserver)? So in the spark task that requires a new spark context, instead of creating it in the task, contact the job server to create one and use the data in the task as the data source either via hdfs/tachyon/s3. Wait until the sub-task is done then continue. Since the job server has the notion of job id, you might use it as a reference to the sub-task. I don't know if this is a good idea or bad one. Maybe this is an anti-pattern of spark, but maybe not. HTH, Jerry On Mon, Jul 14, 2014 at 3:09 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You currently can't use SparkContext inside a Spark task, so in this case you'd have to call some kind of local K-means library. One example you can try to use is Weka (http://www.cs.waikato.ac.nz/ml/weka/). You can then load your text files as an RDD of strings with SparkContext.wholeTextFiles and call Weka on each one. Matei On Jul 14, 2014, at 11:30 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: I understand that the question is very unprofessional, but I am a newbie. If you could share some link where I can ask such questions, if not here. But please answer. On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hey, My question is for this situation: Suppose we have 10 files each containing list of features in each row. Task is that for each file cluster the features in that file and write the corresponding cluster along with it in a new file. So we have to generate 10 more files by applying clustering in each file individually. So can I do it this way, that get rdd of list of files and apply map. Inside the mapper function which will be handling each file, get another spark context and use Mllib kmeans to get the clustered output file. Please suggest the appropriate method to tackle this problem. Thanks, Rahul Kumar Bhojwani 3rd year, B.Tech Computer Science Engineering National Institute Of Technology, Karnataka 9945197359 -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: writing FLume data to HDFS
Stepping a bit back, if you just want to write flume data to HDFS, you can use flume's HDFS sink for that. Trying to do this using Spark Streaming and SparkFlumeEvent is unnecessarily complex. And I guess it is tricky to write the raw bytes from the sparkflumevent into a file. If you want to do it this way, I suggest trying this (not tested, pure guess work). RDD[SparkFlumeEvent] --- map to get the RDD of payload bytes --- do RDD.mapPartition() to write the whole RDD's partition of bytes into a HDFS file (using HDFS's file output stream interface) You will have to take care of making the file names of each parititon unique, and dealing with failures in writing, etc. TD On Mon, Jul 14, 2014 at 9:29 AM, Sundaram, Muthu X. muthu.x.sundaram@sabre.com wrote: I am not sure how to write it…I tried writing to local file system using FileWriter and Print Writer. I tried it inside the while loop. I am able to get the text and able to print it but it fails when I use regular java classes. Shouldn’t I use regular java classes here? Can I write to only HDFS? Should I have to create the file in HDFS using HDFS classes? I thought of using Spark’s SaveAsTextFile(). But I have JavaRDDSparkFlumeEvent of this..not JavaRDDAvroEvent. So I am not sure whether SaveAsText() will work. I appreciate any guidance here. How do I get more code examples? Books, URL? flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); ??I was trying to write the data to hdfs..but it fails… *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com] *Sent:* Friday, July 11, 2014 1:43 PM *To:* user@spark.apache.org *Cc:* u...@spark.incubator.apache.org *Subject:* Re: writing FLume data to HDFS What is the error you are getting when you say ??I was trying to write the data to hdfs..but it fails… TD On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. muthu.x.sundaram@sabre.com wrote: I am new to spark. I am trying to do the following. NetcatàFlumeàSpark streaming(process Flume Data)àHDFS. My flume config file has following set up. Source = netcat Sink=avrosink. Spark Streaming code: I am able to print data from flume to the monitor. But I am struggling to create a file. In order to get the real data I need to convert SparkEvent to avroEvent. JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of SparkEvent..Do I need to convert this in to collection of JavaRDDAvroEvent? Please share any code examples… Thanks. Code: Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName(JavaFlumeEventCount); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); flumeStream.foreachRDD(new Function2JavaRDDSparkFlumeEvent,JavaRDDSparkFlumeEvent,Void(){ @Override public Void call(JavaRDDSparkFlumeEvent events1,JavaRDDSparkFlumeEvent events2) throws Exception{ events1.saveasTextFile(output.txt); return null; } }); /*flumeStream.count().map(new FunctionLong, String() { @Override public String call(Long in) { return Received + in + flume events.; } }).print();*/ flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as
Re: Ideal core count within a single JVM
Thanks a lot for replying back. Actually, I am running the SparkPageRank example with 160GB heap (I am sure the problem is not GC because the excess time is being spent in java code only). What I have observed in Jprofiler and Oprofile outputs is that the amount of time spent in following 2 functions increases substantially with increasing N: 1) java.io.ObjectOutputStream.writeObject0 2) scala.Tuple2.hashCode I don't think that Linux file system could be causing the issue as my machine has 256GB RAM, and I am using a tmpfs for java.io.tmpdir. So, I don't think there is much disk access involved, if that is what you meant. Regards, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9630.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
You have to import StreamingContext._ to enable groupByKey operations on DStreams. After importing that you can apply groupByKey on any DStream, that is a DStream of key-value pairs (e.g. DStream[(String, Int)]) . The data in each pair RDDs will be grouped by the first element in the tuple as the grouping element. TD On Mon, Jul 14, 2014 at 10:59 AM, srinivas kusamsrini...@gmail.com wrote: hi I am new to spark and scala and I am trying to do some aggregations on json file stream using Spark Streaming. I am able to parse the json string and it is converted to map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) now i want to use GROUPBY function on each student map data and wanted to do some aggregations on scores. Here is my main function val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(10)) // ssc.checkpoint(checkpoint) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) jsonf.print() ssc.start() ssc.awaitTermination() } Can anyone please Let me know how to use groupby function..thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: All of the tasks have been completed but the Stage is still shown as Active?
Seems like it is related. Possibly those PRs that Andrew mentioned are going to fix this issue. On Fri, Jul 11, 2014 at 5:51 AM, Haopu Wang hw...@qilinsoft.com wrote: I saw some exceptions like this in driver log. Can you shed some lights? Is it related with the behaviour? 14/07/11 20:40:09 ERROR LiveListenerBus: Listener JobProgressListener threw an exception java.util.NoSuchElementException: key not found: 64019 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) -- *From:* Haopu Wang *Sent:* Thursday, July 10, 2014 7:38 PM *To:* user@spark.apache.org *Subject:* RE: All of the tasks have been completed but the Stage is still shown as Active? I didn't keep the driver's log. It's a lesson. I will try to run it again to see if it happens again. -- *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com] *Sent:* 2014年7月10日 17:29 *To:* user@spark.apache.org *Subject:* Re: All of the tasks have been completed but the Stage is still shown as Active? Do you see any errors in the logs of the driver? On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang hw...@qilinsoft.com wrote: I'm running an App for hours in a standalone cluster. From the data injector and Streaming tab of web ui, it's running well. However, I see quite a lot of Active stages in web ui even some of them have all of their tasks completed. I attach a screenshot for your reference. Do you ever see this kind of behavior?
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
The depends on your requirements. If you want to process the 250 GB input file as a stream to emulate the stream of data, then it should be split into files (such that event ordering is maintained in those splits, if necessary). And then those splits should be moved one-by-one in the directory monitored by the streaming app. You will need to figure out the split size, etc, depending on what is your intended batch size (in terms of seconds) in the streaming app. And it doesnt really need to be a multiple of hdfs block sizes. TD On Sat, Jul 12, 2014 at 7:31 AM, M Singh mans6si...@yahoo.com wrote: Thanks TD. BTW - If I have input file ~ 250 GBs - Is there any guideline on whether to use: - a single input (250 GB) (in this case is there any max upper bound) or - split into 1000 files each of 250 MB (hdfs block size is 250 MB) or - a multiple of hdfs block size. Mans On Friday, July 11, 2014 4:38 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The model for file stream is to pick up and process new files written atomically (by move) into a directory. So your file is being processed in a single batch, and then its waiting for any new files to be written into that directory. TD On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote: So, is it expected for the process to generate stages/tasks even after processing a file ? Also, is there a way to figure out the file that is getting processed and when that process is complete ? Thanks On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: - Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. - If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Re: Ideal core count within a single JVM
Are you increasing the number of parallel tasks with cores as well? With more tasks there will be more data communicated and hence more calls to these functions. Unfortunately contention is kind of hard to measure, since often the result is that you see many cores idle as they're waiting on a lock. ObjectOutputStream should not lock anything, but if it's blocking on a FileOutputStream to write data, that could be a problem. Look for BLOCKED threads in a stack trace too (do jstack on your Java process and look at the TaskRunner threads). Incidentally you can probably speed this up by using Kryo serialization instead of Java (see http://spark.apache.org/docs/latest/tuning.html). That might make it less CPU-bound and it would also create less IO. Matei On Jul 14, 2014, at 12:23 PM, lokesh.gidra lokesh.gi...@gmail.com wrote: Thanks a lot for replying back. Actually, I am running the SparkPageRank example with 160GB heap (I am sure the problem is not GC because the excess time is being spent in java code only). What I have observed in Jprofiler and Oprofile outputs is that the amount of time spent in following 2 functions increases substantially with increasing N: 1) java.io.ObjectOutputStream.writeObject0 2) scala.Tuple2.hashCode I don't think that Linux file system could be causing the issue as my machine has 256GB RAM, and I am using a tmpfs for java.io.tmpdir. So, I don't think there is much disk access involved, if that is what you meant. Regards, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9630.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
When you are sending data using simple socket code to send messages, are those messages \n delimited? If its not, then the receiver of socketTextSTream, wont identify them as separate events, and keep buffering them. TD On Sun, Jul 13, 2014 at 10:49 PM, kytay kaiyang@gmail.com wrote: Hi Tobias I have been using local[4] to test. My problem is likely caused by the tcp host server that I am trying the emulate. I was trying to emulate the tcp host to send out messages. (although I am not sure at the moment :D) First way I tried was to use a tcp tool called, Hercules. Second way was to write a simple socket code to send message at interval. Like the one shown in #2 of my first post. I suspect the reason why it don't work is due the messages are not flush so no message was received on Spark Streaming. I think I will need to do more testing to understand the behavior. I am currently not sure why nc -lk is working, and not the other tools or codes I am testing with. Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error in JavaKafkaWordCount.java example
Are you compiling it within Spark using Spark's recommended way (see doc web page)? Or are you compiling it in your own project? In the latter case, make sure you are using the Scala 2.10.4. TD On Sun, Jul 13, 2014 at 6:43 AM, Mahebub Sayyed mahebub...@gmail.com wrote: Hello, I am referring following example: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java I am getting following C*ompilation Error* : \example\JavaKafkaWordCount.java:[62,70] error: cannot access ClassTag Please help me. Thanks in advance. -- *Regards,* *Mahebub Sayyed*
Re: Spark SQL 1.0.1 error on reading fixed length byte array
This is not supported yet, but there is a PR open to fix it: https://issues.apache.org/jira/browse/SPARK-2446 On Mon, Jul 14, 2014 at 4:17 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, I am using spark-sql 1.0.1 to load parquet files generated from method described in: https://gist.github.com/massie/7224868 When I try to submit a select query with columns of type fixed length byte array, the following error pops up: 14/07/14 11:09:14 INFO scheduler.DAGScheduler: Failed to run take at basicOperators.scala:100 org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:581) at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:559) Caused by: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file s3n://foo/bar/part-r-0.snappy.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:177) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.rdd.RDD$$anonfun$27.apply(RDD.scala:989) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:574) ... 1 more Caused by: java.lang.ClassCastException: Expected instance of primitive converter but got org.apache.spark.sql.parquet.CatalystNativeArrayConverter at parquet.io.api.Converter.asPrimitiveConverter(Converter.java:30) at parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:264) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:60) at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:74) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:110) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) ... 24 more Is fixed length byte array supposed to work in this version? I noticed that other array types like int or string already work. Thanks, -- Pei-Lun
Re: can't print DStream after reduce
The problem is not really for local[1] or local. The problem arises when there are more input streams than there are cores. But I agree, for people who are just beginning to use it by running it locally, there should be a check addressing this. I made a JIRA for this. https://issues.apache.org/jira/browse/SPARK-2464 TD On Sun, Jul 13, 2014 at 4:26 PM, Sean Owen so...@cloudera.com wrote: How about a PR that rejects a context configured for local or local[1]? As I understand it is not intended to work and has bitten several people. On Jul 14, 2014 12:24 AM, Michael Campbell michael.campb...@gmail.com wrote: This almost had me not using Spark; I couldn't get any output. It is not at all obvious what's going on here to the layman (and to the best of my knowledge, not documented anywhere), but now you know you'll be able to answer this question for the numerous people that will also have it. On Sun, Jul 13, 2014 at 5:24 PM, Walrus theCat walrusthe...@gmail.com wrote: Great success! I was able to get output to the driver console by changing the construction of the Streaming Spark Context from: val ssc = new StreamingContext(local /**TODO change once a cluster is up **/, AppName, Seconds(1)) to: val ssc = new StreamingContext(local[2] /**TODO change once a cluster is up **/, AppName, Seconds(1)) I found something that tipped me off that this might work by digging through this mailing list. On Sun, Jul 13, 2014 at 2:00 PM, Walrus theCat walrusthe...@gmail.com wrote: More strange behavior: lines.foreachRDD(x = println(x.first)) // works lines.foreachRDD(x = println((x.count,x.first))) // no output is printed to driver console On Sun, Jul 13, 2014 at 11:47 AM, Walrus theCat walrusthe...@gmail.com wrote: Thanks for your interest. lines.foreachRDD(x = println(x.count)) And I got 0 every once in a while (which I think is strange, because lines.print prints the input I'm giving it over the socket.) When I tried: lines.map(_-1).reduceByKey(_+_).foreachRDD(x = println(x.count)) I got no count. Thanks On Sun, Jul 13, 2014 at 11:34 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Try doing DStream.foreachRDD and then printing the RDD count and further inspecting the RDD. On Jul 13, 2014 1:03 AM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a DStream that works just fine when I say: dstream.print If I say: dstream.map(_,1).print that works, too. However, if I do the following: dstream.reduce{case(x,y) = x}.print I don't get anything on my console. What's going on? Thanks
Re: Catalyst dependency on Spark Core
Yeah, sadly this dependency was introduced when someone consolidated the logging infrastructure. However, the dependency should be very small and thus easy to remove, and I would like catalyst to be usable outside of Spark. A pull request to make this possible would be welcome. Ideally, we'd create some sort of spark common package that has things like logging. That way catalyst could depend on that, without pulling in all of Hadoop, etc. Maybe others have opinions though, so I'm cc-ing the dev list. On Mon, Jul 14, 2014 at 12:21 AM, Yanbo Liang yanboha...@gmail.com wrote: Make Catalyst independent of Spark is the goal of Catalyst, maybe need time and evolution. I awared that package org.apache.spark.sql.catalyst.util embraced org.apache.spark.util.{Utils = SparkUtils}, so that Catalyst has a dependency on Spark core. I'm not sure whether it will be replaced by other component independent of Spark in later release. 2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: As per the recent presentation given in Scala days ( http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it was mentioned that Catalyst is independent of Spark. But on inspecting pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core. Any particular reason for the dependency? I would love to use Catalyst outside Spark (reposted as previous email bounced. Sorry if this is a duplicate).
Re: Nested Query With Spark SQL(1.0.1)
What sort of nested query are you talking about? Right now we only support nested queries in the FROM clause. I'd like to add support for other cases in the future. On Sun, Jul 13, 2014 at 4:11 AM, anyweil wei...@gmail.com wrote: Or is it supported? I know I could doing it myself with filter, but if SQL could support, would be much better, thx! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Query-With-Spark-SQL-1-0-1-tp9544p9547.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Memory compute-intensive tasks
I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research
Spark 1.0.1 EC2 - Launching Applications
Hi All, I've used the spark-ec2 scripts to build a simple 1.0.1 Standalone cluster on EC2. It appears that the spark-submit script is not bundled with a spark-ec2 install. Given that: What is the recommended way to execute spark jobs on a standalone EC2 cluster? Spark-submit provides extremely useful features that are still useful for EC2 deployments. We've used workarounds like modifying the spark-classpath and using run-example in the past to run simple one-time EC2 jobs. The 'Running Applications' section of the EC2-Scripts documentation does not mention how to actually submit jobs to the cluster either. Thanks! Josh
Re: Repeated data item search with Spark SQL(1.0.1)
Handling of complex types is somewhat limited in SQL at the moment. It'll be more complete if you use HiveQL. That said, the problem here is you are calling .name on an array. You need to pick an item from the array (using [..]) or use something like a lateral view explode. On Sat, Jul 12, 2014 at 11:16 PM, anyweil wei...@gmail.com wrote: Hi All: I am using Spark SQL 1.0.1 for a simple test, the loaded data (JSON format) which is registered as table people is: {name:Michael, schools:[{name:ABC,time:1994},{name:EFG,time:2000}]} {name:Andy, age:30,scores:{eng:98,phy:89}} {name:Justin, age:19} the schools has repeated value {name:XXX,time:X}, how should I write the SQL to select the people who has schools with name ABC? I have tried SELECT name FROM people WHERE schools.name = 'ABC' ,but seems wrong with: [error] (run-main-0) org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'name, tree: [error] Project ['name] [error] Filter ('schools.name = ABC) [error] Subquery people [error]ParquetRelation people.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'name, tree: Project ['name] Filter ('schools.name = ABC) Subquery people ParquetRelation people.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:71) ... Could anybody show me how to write a right SQL for the repeated data item search in Spark SQL? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Repeated-data-item-search-with-Spark-SQL-1-0-1-tp9544.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Client application that calls Spark and receives an MLlib *model* Scala Object, not just result
Hello Spark community, I would like to write an application in Scala that i a model server. It should have an MLlib Linear Regression model that is already trained on some big set of data, and then is able to repeatedly call myLinearRegressionModel.predict() many times and return the result. Now, I want this client application to submit a job to Spark and tell the Spark cluster job to 1) train its particular MLlib model, which produces a LinearRegression model, and then 2) take the produced Scala org.apache.spark.mllib.regression.LinearRegressionModel *object*, serialize that object, and return this serialized object over the wire to my calling application. 3) My client application receives the serialized Scala (model) object, and can call .predict() on it over and over. I am separating the heavy lifting of training the model and doing model predictions; the client application will only do predictions using the MLlib model it received from the Spark application. The confusion I have is that I only know how to submit jobs to Spark by using the bin/spark-submit script, and then the only output I receive is stdout (as in, text). I want my scala appliction to hopefully submit the spark model-training programmatically, and for the Spark application to return a SERIALIZED MLLIB OBJECT, not just some stdout text! How can I do this? I think my use case of separating long-running jobs to Spark and using it's libraries in another application should be a pretty common design pattern. Thanks! -- Άρης Βλασακάκης Aris Vlasakakis
Re: Client application that calls Spark and receives an MLlib *model* Scala Object, not just result
Please look at the following. https://github.com/ooyala/spark-jobserver http://en.wikipedia.org/wiki/Predictive_Model_Markup_Language https://github.com/EsotericSoftware/kryo You can train your model convert it to PMML and return that to your client OR You can train your model and write that model (serialized object) to the file system (local, HDFS, S3 etc) or a datastore and return a location back to the client on a successful write. On Mon, Jul 14, 2014 at 4:27 PM, Aris Vlasakakis a...@vlasakakis.com wrote: Hello Spark community, I would like to write an application in Scala that i a model server. It should have an MLlib Linear Regression model that is already trained on some big set of data, and then is able to repeatedly call myLinearRegressionModel.predict() many times and return the result. Now, I want this client application to submit a job to Spark and tell the Spark cluster job to 1) train its particular MLlib model, which produces a LinearRegression model, and then 2) take the produced Scala org.apache.spark.mllib.regression.LinearRegressionModel *object*, serialize that object, and return this serialized object over the wire to my calling application. 3) My client application receives the serialized Scala (model) object, and can call .predict() on it over and over. I am separating the heavy lifting of training the model and doing model predictions; the client application will only do predictions using the MLlib model it received from the Spark application. The confusion I have is that I only know how to submit jobs to Spark by using the bin/spark-submit script, and then the only output I receive is stdout (as in, text). I want my scala appliction to hopefully submit the spark model-training programmatically, and for the Spark application to return a SERIALIZED MLLIB OBJECT, not just some stdout text! How can I do this? I think my use case of separating long-running jobs to Spark and using it's libraries in another application should be a pretty common design pattern. Thanks! -- Άρης Βλασακάκης Aris Vlasakakis
Re: Memory compute-intensive tasks
I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner. On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote: I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
How to kill running spark yarn application
Hi all, A newbie question, I start a spark yarn application through spark-submit How do I kill this app. I can kill the yarn app by yarn application -kill appid but the application master is still running. What's the proper way to shutdown the entire app? Best, Siyuan
Re: Number of executors change during job running
Hi Tathagata, It seems repartition does not necessarily force Spark to distribute the data into different executors. I have launched a new job which uses repartition right after I received data from Kafka. For the first two batches, the reduce stage used more than 80 executors. Starting from the third batch, there were always only 2 executors in the reduce task (combineByKey). Even with the first batch which used more than 80 executors, it took 2.4 mins to finish the reduce stage for a very small amount of data. Bill On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com wrote: After using repartition(300), how many executors did it run on? By the way, repartitions(300) means it will divide the shuffled data into 300 partitions. Since there are many cores on each of the 300 machines/executors, these partitions (each requiring a core) may not be spread all 300 executors. Hence, if you really want spread it all 300 executors, you may have to bump up the partitions even more. However, increasing the partitions to too high may not be beneficial, and you will have play around with the number to figure out sweet spot that reduces the time to process the stage / time to process the whole batch. TD On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val
Re: Memory compute-intensive tasks
I think coalesce with shuffle=true will force it to have one task per node. Without that, it might be that due to data locality it decides to launch multiple ones on the same node even though the total # of tasks is equal to the # of nodes. If this is the *only* thing you run on the cluster, you could also configure the Workers to only report one core by manually launching the spark.deploy.worker.Worker process with that flag (see http://spark.apache.org/docs/latest/spark-standalone.html). Matei On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner. On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote: I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Catalyst dependency on Spark Core
Yeah, I'd just add a spark-util that has these things. Matei On Jul 14, 2014, at 1:04 PM, Michael Armbrust mich...@databricks.com wrote: Yeah, sadly this dependency was introduced when someone consolidated the logging infrastructure. However, the dependency should be very small and thus easy to remove, and I would like catalyst to be usable outside of Spark. A pull request to make this possible would be welcome. Ideally, we'd create some sort of spark common package that has things like logging. That way catalyst could depend on that, without pulling in all of Hadoop, etc. Maybe others have opinions though, so I'm cc-ing the dev list. On Mon, Jul 14, 2014 at 12:21 AM, Yanbo Liang yanboha...@gmail.com wrote: Make Catalyst independent of Spark is the goal of Catalyst, maybe need time and evolution. I awared that package org.apache.spark.sql.catalyst.util embraced org.apache.spark.util.{Utils = SparkUtils}, so that Catalyst has a dependency on Spark core. I'm not sure whether it will be replaced by other component independent of Spark in later release. 2014-07-14 11:51 GMT+08:00 Aniket Bhatnagar aniket.bhatna...@gmail.com: As per the recent presentation given in Scala days (http://people.apache.org/~marmbrus/talks/SparkSQLScalaDays2014.pdf), it was mentioned that Catalyst is independent of Spark. But on inspecting pom.xml of sql/catalyst module, it seems it has a dependency on Spark Core. Any particular reason for the dependency? I would love to use Catalyst outside Spark (reposted as previous email bounced. Sorry if this is a duplicate).
Re: Spark 1.0.1 EC2 - Launching Applications
The script should be there, in the spark/bin directory. What command did you use to launch the cluster? Matei On Jul 14, 2014, at 1:12 PM, Josh Happoldt josh.happo...@trueffect.com wrote: Hi All, I've used the spark-ec2 scripts to build a simple 1.0.1 Standalone cluster on EC2. It appears that the spark-submit script is not bundled with a spark-ec2 install. Given that: What is the recommended way to execute spark jobs on a standalone EC2 cluster? Spark-submit provides extremely useful features that are still useful for EC2 deployments. We've used workarounds like modifying the spark-classpath and using run-example in the past to run simple one-time EC2 jobs. The 'Running Applications' section of the EC2-Scripts documentation does not mention how to actually submit jobs to the cluster either. Thanks! Josh
Re: Memory compute-intensive tasks
Depending on how your C++ program is designed, maybe you can feed the data from multiple partitions into the same process? Getting the results back might be tricky. But that may be the only way to guarantee you're only using one invocation per node. On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I think coalesce with shuffle=true will force it to have one task per node. Without that, it might be that due to data locality it decides to launch multiple ones on the same node even though the total # of tasks is equal to the # of nodes. If this is the *only* thing you run on the cluster, you could also configure the Workers to only report one core by manually launching the spark.deploy.worker.Worker process with that flag (see http://spark.apache.org/docs/latest/spark-standalone.html). Matei On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: I don't have a solution for you (sorry), but do note that rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you set shuffle=true then it should repartition and redistribute the data. But it uses the hash partitioner according to the ScalaDoc - I don't know of any way to supply a custom partitioner. On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote: I'm trying to run a job that includes an invocation of a memory compute-intensive multithreaded C++ program, and so I'd like to run one task per physical node. Using rdd.coalesce(# nodes) seems to just allocate one task per core, and so runs out of memory on the node. Is there any way to give the scheduler a hint that the task uses lots of memory and cores so it spreads it out more evenly? Thanks, Ravi Pandya Microsoft Research -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: pyspark sc.parallelize running OOM with smallish data
Continuing to debug with Scala, I tried this on local with enough memory (10g) and it is able to count the dataset. With more memory(for executor and driver) in a cluster it still fails. The data is about 2Gbytes. It is 30k * 4k doubles. On Sat, Jul 12, 2014 at 6:31 PM, Aaron Davidson ilike...@gmail.com wrote: I think this is probably dying on the driver itself, as you are probably materializing the whole dataset inside your python driver. How large is spark_data_array compared to your driver memory? On Fri, Jul 11, 2014 at 7:30 PM, Mohit Jaggi mohitja...@gmail.com wrote: I put the same dataset into scala (using spark-shell) and it acts weird. I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96 in the status bar, shows details about the worker nodes but there is no progress. sc.parallelize does finish (takes too long for the data size) in scala. On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote: spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in the cluster and gave 48g to executors. also tried kyro serialization. traceback (most recent call last): File /mohit/./m.py, line 58, in module spark_data = sc.parallelize(spark_data_array) File /mohit/spark/python/pyspark/context.py, line 265, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Spark Streaming Json file groupby function
Hi, Thanks for ur reply...i imported StreamingContext and right now i am getting my Dstream as something like map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type -sci) map(id - 432, name -, mobile -423141234,score - 322,test_type - math) each map collection is from json string. now if i want aggregrate the scores on only math or if i want to find out who got the highest score in math that shows both name and score..i would like to what transformation should i do to my existing dstream.I am very new to dealing with maps and dstream transformations..so please advise on how to proceed from here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9656.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SparkR failed to connect to the master
I restarted Spark Master with spark-0.9.1 and SparkR was able to communicate with the Master. I am using the latest SparkR pkg-e1f95b6. Maybe it has problem communicating to Spark 1.0.0? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-failed-to-connect-to-the-master-tp9359p9658.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Json file groupby function
Hi, Thanks for ur reply...i imported StreamingContext and right now i am getting my Dstream as something like map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type -sci) map(id - 432, name -, mobile -423141234,score - 322,test_type - math) each map collection is from json string. now if i want aggregrate the scores on only math or if i want to find out who got the highest score in math that shows both name and score..i would like to what transformation should i do to my existing dstream.I am very new to dealing with maps and dstream transformations..so please advise on how to proceed from here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Number of executors change during job running
Can you give me a screen shot of the stages page in the web ui, the spark logs, and the code that is causing this behavior. This seems quite weird to me. TD On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, It seems repartition does not necessarily force Spark to distribute the data into different executors. I have launched a new job which uses repartition right after I received data from Kafka. For the first two batches, the reduce stage used more than 80 executors. Starting from the third batch, there were always only 2 executors in the reduce task (combineByKey). Even with the first batch which used more than 80 executors, it took 2.4 mins to finish the reduce stage for a very small amount of data. Bill On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das tathagata.das1...@gmail.com wrote: After using repartition(300), how many executors did it run on? By the way, repartitions(300) means it will divide the shuffled data into 300 partitions. Since there are many cores on each of the 300 machines/executors, these partitions (each requiring a core) may not be spread all 300 executors. Hence, if you really want spread it all 300 executors, you may have to bump up the partitions even more. However, increasing the partitions to too high may not be beneficial, and you will have play around with the number to figure out sweet spot that reduces the time to process the stage / time to process the whole batch. TD On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString),
Re: Spark Streaming Json file groupby function
In general it may be a better idea to actually convert the records from hashmaps, to a specific data structure. Say case class Record(id: int, name: String, mobile: String, score: Int, test_type: String ... ) Then you should be able to do something like val records = jsonf.map(m = convertMapToRecord(m)) Then to filter only math results you can do records.filter(r = r.test_type == math). ... If you have to do aggregations (sum, max, etc.) you have to figure out whether you want to aggregate in every batch, or aggregate over a window of time. If you want to do each batch, then filteredRecords.foreachRDD(rdd = { // get aggregates for each batch }) If you want to do across a window of time (say 1 minute), then filteredRecords.window(Minutes(1)).foreachRDD( rdd = { // get aggregates over last 1 minute, every 10 seconds (since 10 second is the batch interval) }) On Mon, Jul 14, 2014 at 3:06 PM, srinivas kusamsrini...@gmail.com wrote: Hi, Thanks for ur reply...i imported StreamingContext and right now i am getting my Dstream as something like map(id - 123, name - srini, mobile - 12324214, score - 123, test_type - math) map(id - 321, name - vasu, mobile - 73942090, score - 324, test_type -sci) map(id - 432, name -, mobile -423141234,score - 322,test_type - math) each map collection is from json string. now if i want aggregrate the scores on only math or if i want to find out who got the highest score in math that shows both name and score..i would like to what transformation should i do to my existing dstream.I am very new to dealing with maps and dstream transformations..so please advise on how to proceed from here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9661.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
import org.apache.spark.streaming.twitter._ in Shell
I'm using spark 1.0.0 (three weeks old build of latest). Along the lines of this tutorial http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html , I want to read some tweets from twitter. When trying to execute in the Spark-Shell, I get The tutorial builds an app via sbt/sbt. Are there any special requirements for importing the TwitterUtils in the shell? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: import org.apache.spark.streaming.twitter._ in Shell
The twitter functionality is not available through the shell. 1) we separated these non-core functionality into separate subprojects so that their dependencies do not collide/pollute those of of core spark 2) a shell is not really the best way to start a long running stream. Its best to use twitter through a separate project. TD On Mon, Jul 14, 2014 at 3:47 PM, durin m...@simon-schaefer.net wrote: I'm using spark 1.0.0 (three weeks old build of latest). Along the lines of this tutorial http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html , I want to read some tweets from twitter. When trying to execute in the Spark-Shell, I get The tutorial builds an app via sbt/sbt. Are there any special requirements for importing the TwitterUtils in the shell? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Change when loading/storing String data using Parquet
I just wanted to send out a quick note about a change in the handling of strings when loading / storing data using parquet and Spark SQL. Before, Spark SQL did not support binary data in Parquet, so all binary blobs were implicitly treated as Strings. 9fe693 https://github.com/apache/spark/commit/9fe693b5b6ed6af34ee1e800ab89c8a11991ea38 fixes this limitation by adding support for binary data. However, data written out with a prior version of Spark SQL will be missing the annotation telling us to interpret a given column as a String, so old string data will now be loaded as binary data. If you would like to use the data as a string, you will need to add a CAST to convert the datatype. New string data written out after this change, will correctly be loaded in as a string as now we will include an annotation about the desired type. Additionally, this should now interoperate correctly with other systems that write Parquet data (hive, thrift, etc). Michael
SQL + streaming
Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much! object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length 4) { System.err.println(Usage: KafkaSpark zkQuorum group topics numThreads) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); //ssc.checkpoint(checkpoint) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(1) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t = getRecord(t._2.split(#))) val result = recordsStream.foreachRDD((recRDD, tt)={ recRDD.registerAsTable(records) val result = sql(select * from records) println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println(Getting the record) Record(l(0), l(1))} }
Re: Stateful RDDs?
Trying answer your questions as concisely as possible 1. In the current implementation, the entire state RDD needs to loaded for any update. It is a known limitation, that we want to overcome in the future. Therefore the state Dstream should not be persisted to disk as all the data in the state RDDs are touched in every batch. Since spark streaming is not really a dedicated data store, its not really designed to separate out hot data and cold data. 2. For each key, in the state you could maintain a timestamp of when it was updated and accordingly return None to filter that state out. Regarding filtering by the minimum key, there may be a way to periodically figure out the minimum key at the driver, then propagate out that information to the executors (update a static variable in the executors) and use that to filter out the keys. Hope this helps. TD On Thu, Jul 10, 2014 at 10:25 AM, Sargun Dhillon sar...@sargun.me wrote: So, one portion of our Spark streaming application requires some state. Our application takes a bunch of application events (i.e. user_session_started, user_session_ended, etc..), and calculates out metrics from these, and writes them to a serving layer (see: Lambda Architecture). Two related events can be ingested into the streaming context moments apart, or time inderminate. Given this, and the fact that our normal windows pump data out every 500-1 ms, with a step of 500ms, you might end up with two related pieces of data across two windows. In order to work around this, we go ahead and do updateStateByKey to persist state, as opposed to persisting key intermediate state in some external system, as building a system to handle the complexities of (concurrent, idempotent) updates, as well as ensure scalability is non-trivial. The questions I have around this, is even in a highly-partitionable dataset, what's the upper scalability limits with stateful dstreams? If I have a dataset, starting at around 10-million keys, growing at that rate monthly, what are the complexities within? Most of the data is cold. I realize that I can remove data from the stateful dstream, by sending (key, null) to it, but there is not necessarily an easy way of knowing when the last update is coming in (unless there is some way in spark of saying, Wait N windows, and send this tuple or Wait until all keys in the upstream Dstreams smaller than M are processed before sending such a tuple. Additionally, given that my data is partitionable by datetime, does it make sense to have a custom datetime partitioner, and just persist the dstream to disk, to ensure that its RDDs are only pulled off of disk (into memory) occasionally? What's the cost of having a bunch of relatively large, stateful RDDs around on disk? Does Spark have to load / deserialize the entire RDD to update one key?
Spark-Streaming collect/take functionality.
Hello everyone, I'm an undergrad working on a summarization project. I've created a summarizer in normal Spark and it works great, however I want to write it for Spark_Streaming to increase it's functionality. Basically I take in a bunch of text and get the most popular words as well as most popular bi-grams (Two words together), and I've managed to do this with streaming (And made it stateful, which is great). However the next part of my algorithm requires me to get the top 10 words and top 10 bigrams and store them in a vector like structure. With just spark I would use code like; array_of_words = words.sortByKey().top(50) Is there a way to mimick this with streaming? I was following along with the ampcamp tutorial http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html so I know that you can print the top 10 by using; sortedCounts.foreach(rdd = println(\nTop 10 hashtags:\n + rdd.take(10).mkString(\n))) However I can't seem to alter this to make it store the top 10, just print them. The instructor mentions at the end that one can get the top 10 hashtags in each partition, collect them together at the driver and then find the top 10 hashtags among them but they leave it as an exercise. I would appreciate any help :) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to kill running spark yarn application
Then yarn application -kill appid should work. This is what I did 2 hours ago. Sorry I cannot provide more help. Sent from my iPhone On 14 Jul, 2014, at 6:05 pm, hsy...@gmail.com hsy...@gmail.com wrote: yarn-cluster On Mon, Jul 14, 2014 at 2:44 PM, Jerry Lam chiling...@gmail.com wrote: Hi Siyuan, I wonder if you --master yarn-cluster or yarn-client? Best Regards, Jerry On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi all, A newbie question, I start a spark yarn application through spark-submit How do I kill this app. I can kill the yarn app by yarn application -kill appid but the application master is still running. What's the proper way to shutdown the entire app? Best, Siyuan
Re: How to kill running spark yarn application
Before yarn application -kill If you do jps You'll have a list of SparkSubmit and ApplicationMaster After you use yarn applicaton -kill you only kill the SparkSubmit On Mon, Jul 14, 2014 at 4:29 PM, Jerry Lam chiling...@gmail.com wrote: Then yarn application -kill appid should work. This is what I did 2 hours ago. Sorry I cannot provide more help. Sent from my iPhone On 14 Jul, 2014, at 6:05 pm, hsy...@gmail.com hsy...@gmail.com wrote: yarn-cluster On Mon, Jul 14, 2014 at 2:44 PM, Jerry Lam chiling...@gmail.com wrote: Hi Siyuan, I wonder if you --master yarn-cluster or yarn-client? Best Regards, Jerry On Mon, Jul 14, 2014 at 5:08 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi all, A newbie question, I start a spark yarn application through spark-submit How do I kill this app. I can kill the yarn app by yarn application -kill appid but the application master is still running. What's the proper way to shutdown the entire app? Best, Siyuan
Re: SQL + streaming
Could you elaborate on what is the problem you are facing? Compiler error? Runtime error? Class-not-found error? Not receiving any data from Kafka? Receiving data but SQL command throwing error? No errors but no output either? TD On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much! object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length 4) { System.err.println(Usage: KafkaSpark zkQuorum group topics numThreads) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); //ssc.checkpoint(checkpoint) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(1) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t = getRecord(t._2.split(#))) val result = recordsStream.foreachRDD((recRDD, tt)={ recRDD.registerAsTable(records) val result = sql(select * from records) println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println(Getting the record) Record(l(0), l(1))} }
Re: Spark-Streaming collect/take functionality.
Why doesnt something like this work? If you want a continuously updated reference to the top counts, you can use a global variable. var topCounts: Array[(String, Int)] = null sortedCounts.foreachRDD (rdd = val currentTopCounts = rdd.take(10) // print currentTopCounts it or watever topCounts = currentTopCounts ) TD On Mon, Jul 14, 2014 at 4:11 PM, jon.burns jon.bu...@uleth.ca wrote: Hello everyone, I'm an undergrad working on a summarization project. I've created a summarizer in normal Spark and it works great, however I want to write it for Spark_Streaming to increase it's functionality. Basically I take in a bunch of text and get the most popular words as well as most popular bi-grams (Two words together), and I've managed to do this with streaming (And made it stateful, which is great). However the next part of my algorithm requires me to get the top 10 words and top 10 bigrams and store them in a vector like structure. With just spark I would use code like; array_of_words = words.sortByKey().top(50) Is there a way to mimick this with streaming? I was following along with the ampcamp tutorial http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html so I know that you can print the top 10 by using; sortedCounts.foreach(rdd = println(\nTop 10 hashtags:\n + rdd.take(10).mkString(\n))) However I can't seem to alter this to make it store the top 10, just print them. The instructor mentions at the end that one can get the top 10 hashtags in each partition, collect them together at the driver and then find the top 10 hashtags among them but they leave it as an exercise. I would appreciate any help :) Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Possible bug in Spark Streaming :: TextFileStream
Oh yes, this was a bug and it has been fixed. Checkout from the master branch! https://issues.apache.org/jira/browse/SPARK-2362?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20created%20DESC%2C%20priority%20ASC TD On Mon, Jul 7, 2014 at 7:11 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I have a basic spark streaming job that is watching a folder, processing any new file and updating a column family in cassandra using the new cassandra-spark-driver. I think there is a problem with SparkStreamingContext.textFileStream... if I start my job in local mode with no files in the folder that is watched and then I copy a bunch of files, sometimes spark is continually processing those files again and again. I have noticed that it usually happens when spark doesn't detect all new files in one go... i.e. I copied 6 files and spark detected 3 of them as new and processed them; then it detected the other 3 as new and processed them. After it finished to process all 6 files, it detected again the first 3 files as new files and processed them... then the other 3... and again... and again... and again. Should I rise a JIRA issue? Regards, Luis
Re: SQL + streaming
No errors but no output either... Thanks! On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Could you elaborate on what is the problem you are facing? Compiler error? Runtime error? Class-not-found error? Not receiving any data from Kafka? Receiving data but SQL command throwing error? No errors but no output either? TD On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much! object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length 4) { System.err.println(Usage: KafkaSpark zkQuorum group topics numThreads) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); //ssc.checkpoint(checkpoint) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(1) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t = getRecord(t._2.split(#))) val result = recordsStream.foreachRDD((recRDD, tt)={ recRDD.registerAsTable(records) val result = sql(select * from records) println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println(Getting the record) Record(l(0), l(1))} }
Re: import org.apache.spark.streaming.twitter._ in Shell
Thanks. Can I see that a Class is not available in the shell somewhere in the API Docs or do I have to find out by trial and error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665p9678.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error when testing with large sparse svm
Is it on a standalone server? There are several settings worthing checking: 1) number of partitions, which should match the number of cores 2) driver memory (you can see it from the executor tab of the Spark WebUI and set it with --driver-memory 10g 3) the version of Spark you were running Best, Xiangrui On Mon, Jul 14, 2014 at 12:14 PM, Srikrishna S srikrishna...@gmail.com wrote: That is exactly the same error that I got. I am still having no success. Regards, Krishna On Mon, Jul 14, 2014 at 11:50 AM, crater cq...@ucmerced.edu wrote: Hi Krishna, Thanks for your help. Are you able to get your 29M data running yet? I fix the previous problem by setting larger spark.akka.frameSize, but now I get some other errors below. Did you get these errors before? 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote Akka client disassociated 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0) 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote Akka client disassociated 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1) 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote Akka client disassociated 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0) 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote Akka client disassociated 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1) 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote Akka client disassociated 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote Akka client disassociated 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on host node6 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SparkR failed to connect to the master
I tried installing the latest Spark 1.0.1 and SparkR couldn't find the master either. I restarted with Spark 0.9.1 and SparkR was able to find the master. So, there seemed to be something that changed after Spark 1.0.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-failed-to-connect-to-the-master-tp9359p9680.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SQL + streaming
Can you make sure you are running locally on more than 1 local cores? You could set the master in the SparkConf as conf.setMaster(local[4]). Then see if there are jobs running on every batch of data in the Spark web ui (running on localhost:4040). If you still dont get any output, try first simple printing recRDD.count() in the foreachRDD (that is, first test spark streaming). If you can get that to work, then I would test the Spark SQL stuff. TD On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com hsy...@gmail.com wrote: No errors but no output either... Thanks! On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Could you elaborate on what is the problem you are facing? Compiler error? Runtime error? Class-not-found error? Not receiving any data from Kafka? Receiving data but SQL command throwing error? No errors but no output either? TD On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com hsy...@gmail.com wrote: Hi All, Couple days ago, I tried to integrate SQL and streaming together. My understanding is I can transform RDD from Dstream to schemaRDD and execute SQL on each RDD. But I got no luck Would you guys help me take a look at my code? Thank you very much! object KafkaSpark { def main(args: Array[String]): Unit = { if (args.length 4) { System.err.println(Usage: KafkaSpark zkQuorum group topics numThreads) System.exit(1) } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName(KafkaSpark) val ssc = new StreamingContext(sparkConf, Seconds(10)) val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc); //ssc.checkpoint(checkpoint) // Importing the SQL context gives access to all the SQL functions and implicit conversions. import sqlContext._ val tt = Time(1) val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(t = getRecord(t._2.split(#))) val result = recordsStream.foreachRDD((recRDD, tt)={ recRDD.registerAsTable(records) val result = sql(select * from records) println(result) result.foreach(println) }) ssc.start() ssc.awaitTermination() } def getRecord(l:Array[String]):Record = { println(Getting the record) Record(l(0), l(1))} }
Re: import org.apache.spark.streaming.twitter._ in Shell
I guess this is not clearly documented. At a high level, any class that is in the package org.apache.spark.streaming.XXX where XXX is in { twitter, kafka, flume, zeromq, mqtt } is not available in the Spark shell. I have added this to the larger JIRA of things-to-add-to-streaming-docs https://issues.apache.org/jira/browse/SPARK-2419 Thanks for bringing this to attention. TD On Mon, Jul 14, 2014 at 5:53 PM, durin m...@simon-schaefer.net wrote: Thanks. Can I see that a Class is not available in the shell somewhere in the API Docs or do I have to find out by trial and error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665p9678.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: import org.apache.spark.streaming.twitter._ in Shell
On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The twitter functionality is not available through the shell. I've been processing Tweets live from the shell, though not for a long time. That's how I uncovered the problem with the Twitter receiver not deregistering, btw. Did I misunderstand your comment? Nick
Re: Error when testing with large sparse svm
I am running Spark 1.0.1 on a 5 node yarn cluster. I have set the driver memory to 8G and executor memory to about 12G. Regards, Krishna On Mon, Jul 14, 2014 at 5:56 PM, Xiangrui Meng men...@gmail.com wrote: Is it on a standalone server? There are several settings worthing checking: 1) number of partitions, which should match the number of cores 2) driver memory (you can see it from the executor tab of the Spark WebUI and set it with --driver-memory 10g 3) the version of Spark you were running Best, Xiangrui On Mon, Jul 14, 2014 at 12:14 PM, Srikrishna S srikrishna...@gmail.com wrote: That is exactly the same error that I got. I am still having no success. Regards, Krishna On Mon, Jul 14, 2014 at 11:50 AM, crater cq...@ucmerced.edu wrote: Hi Krishna, Thanks for your help. Are you able to get your 29M data running yet? I fix the previous problem by setting larger spark.akka.frameSize, but now I get some other errors below. Did you get these errors before? 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote Akka client disassociated 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0) 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote Akka client disassociated 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1) 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote Akka client disassociated 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0) 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote Akka client disassociated 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1) 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote Akka client disassociated 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote Akka client disassociated 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on host node6 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Ideal core count within a single JVM
BTW you can see the number of parallel tasks in the application UI (http://localhost:4040) or in the log messages (e.g. when it says progress: 17/20, that means there are 20 tasks total and 17 are done). Spark will try to use at least one task per core in local mode so there might be more of them here, but if your file is big it will also have at least one task per 32 MB block of the file. Matei On Jul 14, 2014, at 6:39 PM, Matei Zaharia matei.zaha...@gmail.com wrote: I see, so here might be the problem. With more cores, there's less memory available per core, and now many of your threads are doing external hashing (spilling data to disk), as evidenced by the calls to ExternalAppendOnlyMap.spill. Maybe with 10 threads, there was enough memory per task to do all its hashing there. It's true though that these threads appear to be CPU-bound, largely due to Java Serialization. You could get this to run quite a bit faster using Kryo. However that won't eliminate the issue of spilling here. Matei On Jul 14, 2014, at 1:02 PM, lokesh.gidra lokesh.gi...@gmail.com wrote: I am only playing with 'N' in local[N]. I thought that by increasing N, Spark will automatically use more parallel tasks. Isn't it so? Can you please tell me how can I modify the number of parallel tasks? For me, there are hardly any threads in BLOCKED state in jstack output. In 'top' I see my application consuming all the 48 cores all the time with N=48. I am attaching two jstack outputs that I took will the application was running. Lokesh lessoutput3.lessoutput3 http://apache-spark-user-list.1001560.n3.nabble.com/file/n9640/lessoutput3.lessoutput3 lessoutput4.lessoutput4 http://apache-spark-user-list.1001560.n3.nabble.com/file/n9640/lessoutput4.lessoutput4 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ideal-core-count-within-a-single-JVM-tp9566p9640.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
SPARK_WORKER_PORT (standalone cluster)
Hi spark ! What is the purpose of the randomly assigned SPARK_WORKER_PORT from the documentation it sais to join a cluster, but its not clear to me how a random port could be used to communicate with other members of a spark pool. This question might be grounded in my ignorance ... if so please just point me to the right documentation if im mising something obvious :) thanks ! -- jay vyas
jsonRDD: NoSuchMethodError
Hi, I am using Spark 1.0.1. I am using the following piece of code to parse a json file. It is based on the code snippet in the SparkSQL programming guide. However, the compiler outputs an error stating: java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.jsonRDD(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/SchemaRDD; I get a similar error for jsonFile() as well. I have included the spark-sql 1.0.1 jar when building my program using sbt. What is the right library to import for jsonRDD and jsonFile? thanks import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.json object SQLExample{ def main(args : Array[String]) { val sparkConf = new SparkConf().setAppName(JsonExample) val sc = new SparkContext(sparkConf) val sqlc = new org.apache.spark.sql.SQLContext(sc) val jrdd = sc.textFile(args(0)).filter(r= r.trim != ) val data = sqlc.jsonRDD(jrdd) data.printSchema() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark1.0.1 catalyst transform filter not push down
I use queryPlan.queryExecution.analyzed to get the logical plan. it works. And What you explained to me is very useful. Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-catalyst-transform-filter-not-push-down-tp9599p9689.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ---cores option in spark-shell
Neither do they work in new 1.0.1 either -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cores-option-in-spark-shell-tp6809p9690.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonRDD: NoSuchMethodError
Have you upgraded the cluster where you are running this 1.0.1 as well? A NoSuchMethodError almost always means that the class files available at runtime are different from those that were there when you compiled your program. On Mon, Jul 14, 2014 at 7:06 PM, SK skrishna...@gmail.com wrote: Hi, I am using Spark 1.0.1. I am using the following piece of code to parse a json file. It is based on the code snippet in the SparkSQL programming guide. However, the compiler outputs an error stating: java.lang.NoSuchMethodError: org.apache.spark.sql.SQLContext.jsonRDD(Lorg/apache/spark/rdd/RDD;)Lorg/apache/spark/sql/SchemaRDD; I get a similar error for jsonFile() as well. I have included the spark-sql 1.0.1 jar when building my program using sbt. What is the right library to import for jsonRDD and jsonFile? thanks import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.json object SQLExample{ def main(args : Array[String]) { val sparkConf = new SparkConf().setAppName(JsonExample) val sc = new SparkContext(sparkConf) val sqlc = new org.apache.spark.sql.SQLContext(sc) val jrdd = sc.textFile(args(0)).filter(r= r.trim != ) val data = sqlc.jsonRDD(jrdd) data.printSchema() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonRDD-NoSuchMethodError-tp9688.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ---cores option in spark-shell
Yes, the documentation is actually a little outdated. We will get around to fix it shortly. Please use --driver-cores or --executor-cores instead. 2014-07-14 19:10 GMT-07:00 cjwang c...@cjwang.us: Neither do they work in new 1.0.1 either -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cores-option-in-spark-shell-tp6809p9690.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: import org.apache.spark.streaming.twitter._ in Shell
Did you make any updates in Spark version recently, after which you noticed this problem? Because if you were using Spark 0.8 and below, then twitter would have worked in the Spark shell. In Spark 0.9, we moved those dependencies out of the core spark for those to update more freely without raising dependency-related concerns into the core of spark streaming. TD On Mon, Jul 14, 2014 at 6:29 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The twitter functionality is not available through the shell. I've been processing Tweets live from the shell, though not for a long time. That's how I uncovered the problem with the Twitter receiver not deregistering, btw. Did I misunderstand your comment? Nick
Re: import org.apache.spark.streaming.twitter._ in Shell
If we're talking about the issue you captured in SPARK-2464 https://issues.apache.org/jira/browse/SPARK-2464, then it was a newly launched EC2 cluster on 1.0.1. On Mon, Jul 14, 2014 at 10:48 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Did you make any updates in Spark version recently, after which you noticed this problem? Because if you were using Spark 0.8 and below, then twitter would have worked in the Spark shell. In Spark 0.9, we moved those dependencies out of the core spark for those to update more freely without raising dependency-related concerns into the core of spark streaming. TD On Mon, Jul 14, 2014 at 6:29 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The twitter functionality is not available through the shell. I've been processing Tweets live from the shell, though not for a long time. That's how I uncovered the problem with the Twitter receiver not deregistering, btw. Did I misunderstand your comment? Nick
RACK_LOCAL Tasks Failed to finish
Hi all,When running GraphX applications on Spark, task scheduler may schedule some tasks to be executed on RACK_LOCAL executors,but the tasks get halting in that case, repeating print the following log information: 14-07-14 15:59:14 INFO [Executor task launch worker-6] BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 3 ms 14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockManager: Found block rdd_29_38 locally 14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockManager: Found block rdd_29_38 locally 14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockManager: Found block rdd_29_38 locally 14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockFetcherIterator$BasicBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks 14-07-14 15:59:14 INFO [Executor task launch worker-1] BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 3 ms 14-07-14 15:59:14 INFO [Executor task launch worker-0] BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14-07-14 15:59:14 INFO [Executor task launch worker-0] BlockFetcherIterator$BasicBlockFetcherIterator: Getting 300 non-empty blocks out of 300 blocks 14-07-14 15:59:14 INFO [Executor task launch worker-0] BlockFetcherIterator$BasicBlockFetcherIterator: Started 1 remote fetches in 3 ms 14-07-14 15:59:14 INFO [Executor task launch worker-2] BlockManager: Found block rdd_29_2 locally 14-07-14 15:59:14 INFO [Executor task launch worker-2] BlockManager: Found block rdd_29_2 locally 14-07-14 15:59:14 INFO [Executor task launch worker-2] BlockManager: Found block rdd_29_2 locally BlockManager's `get` and `getMultiple` are being called continually, and I don't know why. Are there some rdds being recomputed?Thanks for your help.Qiping
Re: Announcing Spark 1.0.1
Hi, congratulations on the release! I'm always pleased to see how features pop up in new Spark versions that I had added for myself in a very hackish way before (such as JSON support for Spark SQL). I am wondering if there is any good way to learn early about what is going to be in upcoming versions, except than tracking JIRA...? Tobias On Tue, Jul 15, 2014 at 12:50 AM, Philip Ogren philip.og...@oracle.com wrote: Hi Patrick, This is great news but I nearly missed the announcement because it had scrolled off the folder view that I have Spark users list messages go to. 40+ new threads since you sent the email out on Friday evening. You might consider having someone on your team create a spark-announcement list so that it is easier to disseminate important information like this release announcement. Thanks again for all your hard work. I know you and the rest of the team are getting a million requests a day Philip On 07/11/2014 07:35 PM, Patrick Wendell wrote: I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
branch-1.0-jdbc on EC2?
I'm wondering if anyone has had success with an EC2 deployment of the https://github.com/apache/spark/tree/branch-1.0-jdbc https://github.com/apache/spark/tree/branch-1.0-jdbc branch that Michael Armbrust referenced in his Unified Data Access with Spark SQL http://spark-summit.org/2014/talk/performing-advanced-analytics-on-relational-data-with-spark-sql-2 talk at Spark Summit 2014. Is the spark-ec2.py script is set-up for this branch? I don't see how to build the tgz file that the master and worker init.sh scripts pull from github. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/branch-1-0-jdbc-on-EC2-tp9698.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: hdfs replication on saving RDD
eager to know this issue too,does any one knows how? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hdfs-replication-on-saving-RDD-tp289p9700.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: hdfs replication on saving RDD
You can change this setting through SparkContext.hadoopConfiguration, or put the conf/ directory of your Hadoop installation on the CLASSPATH when you launch your app so that it reads the config values from there. Matei On Jul 14, 2014, at 8:06 PM, valgrind_girl 124411...@qq.com wrote: eager to know this issue too,does any one knows how? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hdfs-replication-on-saving-RDD-tp289p9700.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
truly bizarre behavior with local[n] on Spark 1.0.1
Hi, I've got a socketTextStream through which I'm reading input. I have three Dstreams, all of which are the same window operation over that socketTextStream. I have a four core machine. As we've been covering lately, I have to give a cores parameter to my StreamingSparkContext: ssc = new StreamingContext(local[4] /**TODO change once a cluster is up **/, AppName, Seconds(1)) Now, I have three dstreams, and all I ask them to do is print or count. I should preface this with the statement that they all work on their own. dstream1 // 1 second window dstream2 // 2 second window dstream3 // 5 minute window If I construct the ssc with local[8], and put these statements in this order, I get prints on the first one, and zero counts on the second one: ssc(local[8]) // hyperthread dat sheezy dstream1.print // works dstream2.count.print // always prints 0 If I do this, this happens: ssc(local[4]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // doesn't work, prints 0 ssc(local[6]) dstream1.print // doesn't work, just gives me the Time: ms message dstream2.count.print // works, prints 1 Sometimes these results switch up, seemingly at random. How can I get things to the point where I can develop and test my application locally? Thanks
Re: import org.apache.spark.streaming.twitter._ in Shell
Oh right, that could have happened only after Spark 1.0.0. So let me clarify. At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? If yes, then what change in Spark caused it to not work any more? TD On Mon, Jul 14, 2014 at 7:52 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: If we're talking about the issue you captured in SPARK-2464 https://issues.apache.org/jira/browse/SPARK-2464, then it was a newly launched EC2 cluster on 1.0.1. On Mon, Jul 14, 2014 at 10:48 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Did you make any updates in Spark version recently, after which you noticed this problem? Because if you were using Spark 0.8 and below, then twitter would have worked in the Spark shell. In Spark 0.9, we moved those dependencies out of the core spark for those to update more freely without raising dependency-related concerns into the core of spark streaming. TD On Mon, Jul 14, 2014 at 6:29 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Mon, Jul 14, 2014 at 6:52 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The twitter functionality is not available through the shell. I've been processing Tweets live from the shell, though not for a long time. That's how I uncovered the problem with the Twitter receiver not deregistering, btw. Did I misunderstand your comment? Nick
Re: Error when testing with large sparse svm
(1) What is number of partitions? Is it number of workers per node? (2) I already set the driver memory pretty big, which is 25g. (3) I am running Spark 1.0.1 in standalone cluster with 9 nodes, 1 one them works as master, others are workers. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9706.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Possible bug in Spark Streaming :: TextFileStream
Hi Team, Is this issue with JavaStreamingContext.textFileStream(hdfsfolderpath) API also? Please conform. If yes, could you please help me to fix this issue. I'm using spark 1.0.0 version. Regards, Rajesh On Tue, Jul 15, 2014 at 5:42 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Oh yes, this was a bug and it has been fixed. Checkout from the master branch! https://issues.apache.org/jira/browse/SPARK-2362?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20created%20DESC%2C%20priority%20ASC TD On Mon, Jul 7, 2014 at 7:11 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I have a basic spark streaming job that is watching a folder, processing any new file and updating a column family in cassandra using the new cassandra-spark-driver. I think there is a problem with SparkStreamingContext.textFileStream... if I start my job in local mode with no files in the folder that is watched and then I copy a bunch of files, sometimes spark is continually processing those files again and again. I have noticed that it usually happens when spark doesn't detect all new files in one go... i.e. I copied 6 files and spark detected 3 of them as new and processed them; then it detected the other 3 as new and processed them. After it finished to process all 6 files, it detected again the first 3 files as new files and processed them... then the other 3... and again... and again... and again. Should I rise a JIRA issue? Regards, Luis