Re: Number of executors change during job running
If I understand correctly, you could not change the number of executors at runtime right(correct me if am wrong) - its defined when we start the application and fixed. Do you mean number of tasks? On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Spark Streaming with Kafka NoClassDefFoundError
Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$. createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$. createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream( KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Streaming. Cannot get socketTextStream to receive anything.
You simply use the *nc* command to do this. like: nc -p 12345 will open the 12345 port and from the terminal you can provide whatever input you require for your StreamingCode. Thanks Best Regards On Fri, Jul 11, 2014 at 2:41 AM, kytay kaiyang@gmail.com wrote: Hi I am learning spark streaming, and is trying out the JavaNetworkCount example. #1 - This is the code I wrote JavaStreamingContext sctx = new JavaStreamingContext(local, appName, new Duration(5000)); JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1, ); JavaDStreamString words =lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String arg0) throws Exception { System.out.println(Print text: + arg0); return Arrays.asList(arg0.split( )); } }); #2 - This is the socketCode I am using import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; public class TestTcpServer { public static void main(String argv[]) throws Exception { String clientSentence; String capitalizedSentence; ServerSocket welcomeSocket = new ServerSocket(); int i = 0; while(true) { Socket connectionSocket = welcomeSocket.accept(); BufferedReader inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()) ); DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream()); while(true) { String sendingStr = Sending... data... + i; outToClient.writeBytes(sendingStr); System.out.println(sendingStr); i++; Thread.sleep(3000); } } } } What I am trying to do is to get the JavaNetworkCount in #1 to start printing all the text I am receiving. But so far I failed to achieve that. I have been using Hercules Setup http://www.hw-group.com/products/hercules/details_en.html to simulate as a TCP server, as well as a simple serversocket code in #2... But I am not seeing any text being printed on the console. Is public IterableString call(String arg0) throws Exception being called every 5 secs? The console log is in http://pastebin.com/THzdzGhg http://pastebin.com/THzdzGhg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Number of executors change during job running
Hi Praveen, I did not change the number of total executors. I specified 300 as the number of executors when I submitted the jobs. However, for some stages, the number of executors is very small, leading to long calculation time even for small data set. That means not all executors were used for some stages. If I went to the detail of the running time of different executors, I found some of them had very low running time while very few had very long running time, leading to long overall running time. Another point I noticed is that the number of completed tasks are usually larger than the number of total tasks. That means sometimes the job is still running in some stages although all the tasks have been finished. These are the too behavior I observed that may related to the wrong running time. Bill On Thu, Jul 10, 2014 at 11:26 PM, Praveen Seluka psel...@qubole.com wrote: If I understand correctly, you could not change the number of executors at runtime right(correct me if am wrong) - its defined when we start the application and fixed. Do you mean number of tasks? On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Streaming. Cannot get socketTextStream to receive anything.
Sorry, the command is nc -lk 12345 Thanks Best Regards On Fri, Jul 11, 2014 at 6:46 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You simply use the *nc* command to do this. like: nc -p 12345 will open the 12345 port and from the terminal you can provide whatever input you require for your StreamingCode. Thanks Best Regards On Fri, Jul 11, 2014 at 2:41 AM, kytay kaiyang@gmail.com wrote: Hi I am learning spark streaming, and is trying out the JavaNetworkCount example. #1 - This is the code I wrote JavaStreamingContext sctx = new JavaStreamingContext(local, appName, new Duration(5000)); JavaReceiverInputDStreamString lines = sctx.socketTextStream(127.0.0.1, ); JavaDStreamString words =lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String arg0) throws Exception { System.out.println(Print text: + arg0); return Arrays.asList(arg0.split( )); } }); #2 - This is the socketCode I am using import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.ServerSocket; import java.net.Socket; public class TestTcpServer { public static void main(String argv[]) throws Exception { String clientSentence; String capitalizedSentence; ServerSocket welcomeSocket = new ServerSocket(); int i = 0; while(true) { Socket connectionSocket = welcomeSocket.accept(); BufferedReader inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()) ); DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream()); while(true) { String sendingStr = Sending... data... + i; outToClient.writeBytes(sendingStr); System.out.println(sendingStr); i++; Thread.sleep(3000); } } } } What I am trying to do is to get the JavaNetworkCount in #1 to start printing all the text I am receiving. But so far I failed to achieve that. I have been using Hercules Setup http://www.hw-group.com/products/hercules/details_en.html to simulate as a TCP server, as well as a simple serversocket code in #2... But I am not seeing any text being printed on the console. Is public IterableString call(String arg0) throws Exception being called every 5 secs? The console log is in http://pastebin.com/THzdzGhg http://pastebin.com/THzdzGhg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming with Kafka NoClassDefFoundError
Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com mailto:dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Join two Spark Streaming
Hi Tathagata, Thanks for the solution. Actually, I will use the number of unique integers in the batch instead of accumulative number of unique integers. I do have two questions about your code: 1. Why do we need uniqueValuesRDD? Why do we need to call uniqueValuesRDD.checkpoint()? 2. Where is distinctValues defined? Bill On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you want to continuously maintain the set of unique integers seen since the beginning of stream? var uniqueValuesRDD: RDD[Int] = ... dstreamOfIntegers.transform(newDataRDD = { val newUniqueValuesRDD = newDataRDD.union(distinctValues).distinct uniqueValuesRDD = newUniqueValuesRDD // periodically call uniqueValuesRDD.checkpoint() val uniqueCount = uniqueValuesRDD.count() newDataRDD.map(x = x / count) }) On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am working on a pipeline that needs to join two Spark streams. The input is a stream of integers. And the output is the number of integer's appearance divided by the total number of unique integers. Suppose the input is: 1 2 3 1 2 2 There are 3 unique integers and 1 appears twice. Therefore, the output for the integer 1 will be: 1 0.67 Since the input is from a stream, it seems we need to first join the appearance of the integers and the total number of unique integers and then do a calculation using map. I am thinking of adding a dummy key to both streams and use join. However, a Cartesian product matches the application here better. How to do this effectively? Thanks! Bill
Re: Spark Streaming with Kafka NoClassDefFoundError
I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
Re: Recommended pipeline automation tool? Oozie?
You may look into the new Azkaban - which while being quite heavyweight is actually quite pleasant to use when set up. You can run spark jobs (spark-submit) using azkaban shell commands and pass paremeters between jobs. It supports dependencies, simple dags and scheduling with retries. I'm digging deeper and it may be worthwhile extending it with a Spark job type... It's probably best for mixed Hadoop / Spark clusters... — Sent from Mailbox On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote: I used both - Oozie and Luigi - but found them inflexible and still overcomplicated, especially in presence of Spark. Oozie has a fixed list of building blocks, which is pretty limiting. For example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out of scope (of course, you can always write wrapper as Java or Shell action, but does it really need to be so complicated?). Another issue with Oozie is passing variables between actions. There's Oozie context that is suitable for passing key-value pairs (both strings) between actions, but for more complex objects (say, FileInputStream that should be closed at last step only) you have to do some advanced kung fu. Luigi, on other hand, has its niche - complicated dataflows with many tasks that depend on each other. Basically, there are tasks (this is where you define computations) and targets (something that can exist - file on disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a plan for achieving this. Luigi is really shiny when your workflow fits this model, but one step away and you are in trouble. For example, consider simple pipeline: run MR job and output temporary data, run another MR job and output final data, clean temporary data. You can make target Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do you check that Clean task is achieved? If you just test whether temporary directory is empty or not, you catch both cases - when all tasks are done and when they are not even started yet. Luigi allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire idea. And of course, both of these frameworks are optimized for standard MapReduce jobs, which is probably not what you want on Spark mailing list :) Experience with these frameworks, however, gave me some insights about typical data pipelines. 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks allow branching, but most pipelines actually consist of moving data from source to destination with possibly some transformations in between (I'll be glad if somebody share use cases when you really need branching). 2. Transactional logic is important. Either everything, or nothing. Otherwise it's really easy to get into inconsistent state. 3. Extensibility is important. You never know what will need in a week or two. So eventually I decided that it is much easier to create your own pipeline instead of trying to adopt your code to existing frameworks. My latest pipeline incarnation simply consists of a list of steps that are started sequentially. Each step is a class with at least these methods: * run() - launch this step * fail() - what to do if step fails * finalize() - (optional) what to do when all steps are done For example, if you want to add possibility to run Spark jobs, you just create SparkStep and configure it with required code. If you want Hive query - just create HiveStep and configure it with Hive connection settings. I use YAML file to configure steps and Context (basically, Map[String, Any]) to pass variables between them. I also use configurable Reporter available for all steps to report the progress. Hopefully, this will give you some insights about best pipeline for your specific case. On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote: We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote: I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Number of executors change during job running
Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Recommended pipeline automation tool? Oozie?
We use Azkaban for a short time and suffer a lot. Finally we almost rewrite it totally. Don’t recommend it really. 发件人: Nick Pentreath nick.pentre...@gmail.com 答复: user@spark.apache.org 日期: 2014年7月11日 星期五 下午3:18 至: user@spark.apache.org 主题: Re: Recommended pipeline automation tool? Oozie? You may look into the new Azkaban - which while being quite heavyweight is actually quite pleasant to use when set up. You can run spark jobs (spark-submit) using azkaban shell commands and pass paremeters between jobs. It supports dependencies, simple dags and scheduling with retries. I'm digging deeper and it may be worthwhile extending it with a Spark job type... It's probably best for mixed Hadoop / Spark clusters... ― Sent from Mailbox https://www.dropbox.com/mailbox On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote: I used both - Oozie and Luigi - but found them inflexible and still overcomplicated, especially in presence of Spark. Oozie has a fixed list of building blocks, which is pretty limiting. For example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out of scope (of course, you can always write wrapper as Java or Shell action, but does it really need to be so complicated?). Another issue with Oozie is passing variables between actions. There's Oozie context that is suitable for passing key-value pairs (both strings) between actions, but for more complex objects (say, FileInputStream that should be closed at last step only) you have to do some advanced kung fu. Luigi, on other hand, has its niche - complicated dataflows with many tasks that depend on each other. Basically, there are tasks (this is where you define computations) and targets (something that can exist - file on disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a plan for achieving this. Luigi is really shiny when your workflow fits this model, but one step away and you are in trouble. For example, consider simple pipeline: run MR job and output temporary data, run another MR job and output final data, clean temporary data. You can make target Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do you check that Clean task is achieved? If you just test whether temporary directory is empty or not, you catch both cases - when all tasks are done and when they are not even started yet. Luigi allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire idea. And of course, both of these frameworks are optimized for standard MapReduce jobs, which is probably not what you want on Spark mailing list :) Experience with these frameworks, however, gave me some insights about typical data pipelines. 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks allow branching, but most pipelines actually consist of moving data from source to destination with possibly some transformations in between (I'll be glad if somebody share use cases when you really need branching). 2. Transactional logic is important. Either everything, or nothing. Otherwise it's really easy to get into inconsistent state. 3. Extensibility is important. You never know what will need in a week or two. So eventually I decided that it is much easier to create your own pipeline instead of trying to adopt your code to existing frameworks. My latest pipeline incarnation simply consists of a list of steps that are started sequentially. Each step is a class with at least these methods: * run() - launch this step * fail() - what to do if step fails * finalize() - (optional) what to do when all steps are done For example, if you want to add possibility to run Spark jobs, you just create SparkStep and configure it with required code. If you want Hive query - just create HiveStep and configure it with Hive connection settings. I use YAML file to configure steps and Context (basically, Map[String, Any]) to pass variables between them. I also use configurable Reporter available for all steps to report the progress. Hopefully, this will give you some insights about best pipeline for your specific case. On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote: We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) ― p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote: I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie
Re: Recommended pipeline automation tool? Oozie?
Did you use old azkaban or azkaban 2.5? It has been completely rewritten. Not saying it is the best but I found it way better than oozie for example. Sent from my iPhone On 11 Jul 2014, at 09:24, 明风 mingf...@taobao.com wrote: We use Azkaban for a short time and suffer a lot. Finally we almost rewrite it totally. Don’t recommend it really. 发件人: Nick Pentreath nick.pentre...@gmail.com 答复: user@spark.apache.org 日期: 2014年7月11日 星期五 下午3:18 至: user@spark.apache.org 主题: Re: Recommended pipeline automation tool? Oozie? You may look into the new Azkaban - which while being quite heavyweight is actually quite pleasant to use when set up. You can run spark jobs (spark-submit) using azkaban shell commands and pass paremeters between jobs. It supports dependencies, simple dags and scheduling with retries. I'm digging deeper and it may be worthwhile extending it with a Spark job type... It's probably best for mixed Hadoop / Spark clusters... — Sent from Mailbox On Fri, Jul 11, 2014 at 12:52 AM, Andrei faithlessfri...@gmail.com wrote: I used both - Oozie and Luigi - but found them inflexible and still overcomplicated, especially in presence of Spark. Oozie has a fixed list of building blocks, which is pretty limiting. For example, you can launch Hive query, but Impala, Shark/SparkSQL, etc. are out of scope (of course, you can always write wrapper as Java or Shell action, but does it really need to be so complicated?). Another issue with Oozie is passing variables between actions. There's Oozie context that is suitable for passing key-value pairs (both strings) between actions, but for more complex objects (say, FileInputStream that should be closed at last step only) you have to do some advanced kung fu. Luigi, on other hand, has its niche - complicated dataflows with many tasks that depend on each other. Basically, there are tasks (this is where you define computations) and targets (something that can exist - file on disk, entry in ZooKeeper, etc.). You ask Luigi to get some target, and it creates a plan for achieving this. Luigi is really shiny when your workflow fits this model, but one step away and you are in trouble. For example, consider simple pipeline: run MR job and output temporary data, run another MR job and output final data, clean temporary data. You can make target Clean, that depends on target MRJob2 that, in its turn, depends on MRJob1, right? Not so easy. How do you check that Clean task is achieved? If you just test whether temporary directory is empty or not, you catch both cases - when all tasks are done and when they are not even started yet. Luigi allows you to specify all 3 actions - MRJob1, MRJob2, Clean - in a single run() method, but ruins the entire idea. And of course, both of these frameworks are optimized for standard MapReduce jobs, which is probably not what you want on Spark mailing list :) Experience with these frameworks, however, gave me some insights about typical data pipelines. 1. Pipelines are mostly linear. Oozie, Luigi and number of other frameworks allow branching, but most pipelines actually consist of moving data from source to destination with possibly some transformations in between (I'll be glad if somebody share use cases when you really need branching). 2. Transactional logic is important. Either everything, or nothing. Otherwise it's really easy to get into inconsistent state. 3. Extensibility is important. You never know what will need in a week or two. So eventually I decided that it is much easier to create your own pipeline instead of trying to adopt your code to existing frameworks. My latest pipeline incarnation simply consists of a list of steps that are started sequentially. Each step is a class with at least these methods: * run() - launch this step * fail() - what to do if step fails * finalize() - (optional) what to do when all steps are done For example, if you want to add possibility to run Spark jobs, you just create SparkStep and configure it with required code. If you want Hive query - just create HiveStep and configure it with Hive connection settings. I use YAML file to configure steps and Context (basically, Map[String, Any]) to pass variables between them. I also use configurable Reporter available for all steps to report the progress. Hopefully, this will give you some insights about best pipeline for your specific case. On Thu, Jul 10, 2014 at 9:10 PM, Paul Brown p...@mult.ifario.us wrote: We use Luigi for this purpose. (Our pipelines are typically on AWS (no EMR) backed by S3 and using combinations of Python jobs, non-Spark Java/Scala, and Spark. We run Spark jobs by connecting drivers/clients to the master, and those are what is invoked from Luigi.) — p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/ On Thu, Jul 10, 2014 at 10:20 AM, k.tham
Re: KMeans code is rubbish
I also took a look at spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala and ran the code in a shell. There is an issue here: val initMode = params.initializationMode match { case Random = KMeans.RANDOM case Parallel = KMeans.K_MEANS_PARALLEL } If I use initMode=KMeans.RANDOM everything is ok. If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not know why. The example proposed is a really simple one that should not accept multiple solutions and always converge to the correct one. Now what can be altered in the original SparkKMeans.scala (the seed or something else ?) to get the correct results each and every single time ? On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng men...@gmail.com wrote: SparkKMeans is a naive implementation. Please use mllib.clustering.KMeans in practice. I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your dataset as well, I got the expected answer. And I believe that even though initialization is done using sampling, the example actually sets the seed to a constant 42, so the result should always be the same no matter how many times you run it. So I am not really sure whats going on here. Can you tell us more about which version of Spark you are running? Which Java version? == [tdas @ Xion spark2] cat input 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from SCDynamicStore 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Finished iteration (delta = 3.0) Finished iteration (delta = 0.0) Final centers: DenseVector(5.0, 2.0) DenseVector(2.0, 2.0) On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: so this is what I am running: ./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001 And this is the input file: ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$ └───#!cat ~/Documents/2dim2.txt 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 This is the final output from spark: 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 14 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on localhost (progress: 1/2) 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 15 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on localhost (progress: 2/2) 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at SparkKMeans.scala:75) finished in 0.008 s 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at SparkKMeans.scala:75, took 0.02472681 s Finished iteration (delta = 0.0) Final centers: DenseVector(2.8571428571428568, 2.0) DenseVector(5.6005, 2.0) On Thursday, July 10, 2014 12:02 PM, Bertrand Dechoux decho...@gmail.com wrote: A picture is worth a thousand... Well, a picture with this dataset, what you are expecting and what you get, would help answering your initial question. Bertrand On Thu, Jul 10, 2014 at 10:44 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: Can someone please run the standard kMeans code on this input with 2 centers ?: 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 The obvious result
Re: Spark Streaming with Kafka NoClassDefFoundError
A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com mailto:dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com mailto:dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
KMeans for large training data
Hi, I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic clustering with Strings. My code works great when I use a five-figure amount of training elements. However, with for example 2 million elements, it gets extremely slow. A single stage may take up to 30 minutes. From the Web UI, I can see that it does these three things repeatedly: All of these tasks only use one executor, and on that executor only one core. And I can see a scheduler delay of about 25 seconds. I tried to use broadcast variables to speed this up, but maybe I'm using it wrong. The relevant code (where it gets slow) is this: What could I do to use more executors, and generally speed this up? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KMeans for large training data
How many partitions do you use for your data? if the default is 1, you probably need to manually ask for more partitions. Also, I'd check that your executors aren't thrashing close to the GC limit. This can make things start to get very slow. On Fri, Jul 11, 2014 at 9:53 AM, durin m...@simon-schaefer.net wrote: Hi, I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic clustering with Strings. My code works great when I use a five-figure amount of training elements. However, with for example 2 million elements, it gets extremely slow. A single stage may take up to 30 minutes. From the Web UI, I can see that it does these three things repeatedly: All of these tasks only use one executor, and on that executor only one core. And I can see a scheduler delay of about 25 seconds. I tried to use broadcast variables to speed this up, but maybe I'm using it wrong. The relevant code (where it gets slow) is this: What could I do to use more executors, and generally speed this up? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
Hi Akhil Das I have tried the nc -lk command too. I was hoping the System.out.println(Print text: + arg0); is printed when a stream is processed when lines.flatMap(...) is called. But from my test with nc -lk , nothing is printed on the console at all. == To test out whether the nc tool is working, I have also test the nc tool with the Hercules TCP client test tool, it works fine. So now the question goes back to why JavaDStreamString words =lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String arg0) throws Exception { System.out.println(Print text: + arg0); return Arrays.asList(arg0.split( )); } }); is not printing the text I am sending through nc -lk . === Is there any other way to test if socketTextStream(...) is working? Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9409.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
I think I should be seeing any line of text that I have typed in the nc command. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming. Cannot get socketTextStream to receive anything.
Can you try this piece of code? SparkConf sparkConf = new SparkConf().setAppName(JavaNetworkWordCount ); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); JavaReceiverInputDStreamString lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels. MEMORY_AND_DISK_SER); JavaDStreamString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) { return new Tuple2String, Integer(s, 1); } }).reduceByKey(new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); Taken from https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java Thanks Best Regards On Fri, Jul 11, 2014 at 9:58 AM, kytay kaiyang@gmail.com wrote: I think I should be seeing any line of text that I have typed in the nc command. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
RE: All of the tasks have been completed but the Stage is still shown as Active?
I saw some exceptions like this in driver log. Can you shed some lights? Is it related with the behaviour? 14/07/11 20:40:09 ERROR LiveListenerBus: Listener JobProgressListener threw an exception java.util.NoSuchElementException: key not found: 64019 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.ui.jobs.JobProgressListener.onStageCompleted(JobProgressListener.scala:78) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$2.apply(SparkListenerBus.scala:48) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:48) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) From: Haopu Wang Sent: Thursday, July 10, 2014 7:38 PM To: user@spark.apache.org Subject: RE: All of the tasks have been completed but the Stage is still shown as Active? I didn't keep the driver's log. It's a lesson. I will try to run it again to see if it happens again. From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: 2014年7月10日 17:29 To: user@spark.apache.org Subject: Re: All of the tasks have been completed but the Stage is still shown as Active? Do you see any errors in the logs of the driver? On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang hw...@qilinsoft.com wrote: I'm running an App for hours in a standalone cluster. From the data injector and Streaming tab of web ui, it's running well. However, I see quite a lot of Active stages in web ui even some of them have all of their tasks completed. I attach a screenshot for your reference. Do you ever see this kind of behavior?
Iteration question
Hi, folks. We're having a problem with iteration that I don't understand. We have the following test code: org.apache.log4j.Logger.getLogger(org).setLevel(org.apache.log4j.Level.WARN) org.apache.log4j.Logger.getLogger(akka).setLevel(org.apache.log4j.Level.WARN) def test (caching: Boolean, points: Int, iterations: Int) { var coords = sc.parallelize(Array.fill(points)(0.0, 0.0).zipWithIndex.map(_.swap)) if (caching) coords.cache coords.count var iteration = 0 val times = new Array[Double](iterations) do { val start = System.currentTimeMillis val thisIteration = iteration val increments = sc.parallelize(for (i - 1 to points) yield (math.random, math.random)) val newcoords = coords.zip(increments).map(p = { if (0 == p._1._1) println(Processing iteration +thisIteration) (p._1._1, (p._1._2._1 + p._2._1, p._1._2._2 + p._2._2)) } ) if (caching) newcoords.cache newcoords.count if (caching) coords.unpersist(false) coords = newcoords val end = System.currentTimeMillis times(iteration) = (end-start)/1000.0 println(Done iteration +iteration+ in +times(iteration)+ seconds) iteration = iteration + 1 } while (iteration iterations) for (i - 0 until iterations) { println(Iteration +i+: +times(i)) } } If you run this on a local server with caching on and off, it appears that the caching does what it is supposed to do - only the latest iteration is processed each time through the loop. However, despite this, the time for each iteration still gets slower and slower. For example, calling test(true, 5000, 100), I get the following times (weeding out a few for brevity): Iteration 0: 0.084 Iteration 10: 0.381 Iteration 20: 0.674 Iteration 30: 0.975 Iteration 40: 1.254 Iteration 50: 1.544 Iteration 60: 1.802 Iteration 70: 2.147 Iteration 80: 2.469 Iteration 90: 2.715 Iteration 99: 2.962 That's a 35x increase between the first and last iteration, when it should be doing the same thing each time! Without caching, the nubmers are Iteration 0: 0.642 Iteration 10: 0.516 Iteration 20: 0.823 Iteration 30: 1.17 Iteration 40: 1.514 Iteration 50: 1.655 Iteration 60: 1.992 Iteration 70: 2.177 Iteration 80: 2.472 Iteration 90: 2.814 Iteration 99: 3.018 slightly slower - but not significantly. Does anyone know, if the caching is working, why is iteration 100 slower than iteration 1? And why is caching making so little difference? Thanks, -Nathan Kronenfeld -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Categorical Features for K-Means Clustering
Hi Folks, Does any one have experience or recommendations on incorporating categorical features (attributes) into k-means clustering in Spark? In other words, I want to cluster on a set of attributes that include categorical variables. I know I could probably implement some custom code to parse and calculate my own similarity function, but I wanted to reach out before I did so. I’d also prefer to take advantage of the k-means\parallel initialization feature of the model in MLlib, so an MLlib-based implementation would be preferred. Thanks in advance. Best, -Wen signature.asc Description: Message signed with OpenPGP using GPGMail
Re: Categorical Features for K-Means Clustering
Since you can't define your own distance function, you will need to convert these to numeric dimensions. 1-of-n encoding can work OK, depending on your use case. So a dimension that takes on 3 categorical values, becomes 3 dimensions, of which all are 0 except one that has value 1. On Fri, Jul 11, 2014 at 3:07 PM, Wen Phan wen.p...@mac.com wrote: Hi Folks, Does any one have experience or recommendations on incorporating categorical features (attributes) into k-means clustering in Spark? In other words, I want to cluster on a set of attributes that include categorical variables. I know I could probably implement some custom code to parse and calculate my own similarity function, but I wanted to reach out before I did so. I’d also prefer to take advantage of the k-means\parallel initialization feature of the model in MLlib, so an MLlib-based implementation would be preferred. Thanks in advance. Best, -Wen
Re: Categorical Features for K-Means Clustering
I see. So, basically, kind of like dummy variables like with regressions. Thanks, Sean. On Jul 11, 2014, at 10:11 AM, Sean Owen so...@cloudera.com wrote: Since you can't define your own distance function, you will need to convert these to numeric dimensions. 1-of-n encoding can work OK, depending on your use case. So a dimension that takes on 3 categorical values, becomes 3 dimensions, of which all are 0 except one that has value 1. On Fri, Jul 11, 2014 at 3:07 PM, Wen Phan wen.p...@mac.com wrote: Hi Folks, Does any one have experience or recommendations on incorporating categorical features (attributes) into k-means clustering in Spark? In other words, I want to cluster on a set of attributes that include categorical variables. I know I could probably implement some custom code to parse and calculate my own similarity function, but I wanted to reach out before I did so. I’d also prefer to take advantage of the k-means\parallel initialization feature of the model in MLlib, so an MLlib-based implementation would be preferred. Thanks in advance. Best, -Wen signature.asc Description: Message signed with OpenPGP using GPGMail
Spark Streaming timing considerations
Hi, In the spark streaming paper, slack time has been suggested for delaying the batch creation in case of external timestamps. I don't see any such option in streamingcontext. Is it available in the API? Also going through the previous posts, queueStream has been suggested for this. I looked into to queueStream example. // Create and push some RDDs into Queue for (i - 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 10) Thread.sleep(1000) } The only thing I am unsure is how to make batches(basic RDD) out of stream coming on a port. Regards, Laeeq
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Hi Gerard, This was on my todos since long... i just published a Calliope snapshot built against Hadoop 2.2.x, Take it for a spin if you get a chance - You can get the jars from here - - https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope_2.10/0.9.4-H2-SNAPSHOT/calliope_2.10-0.9.4-H2-SNAPSHOT.jar - https://oss.sonatype.org/service/local/repositories/snapshots/content/com/tuplejump/calliope-macros_2.10/0.9.4-H2-SNAPSHOT/calliope-macros_2.10-0.9.4-H2-SNAPSHOT.jar Or to use from Maven - dependency groupIdcom.tuplejump/groupId artifactIdcalliope_2.10/artifactId version0.9.4-H2-SNAPSHOT/version/dependency and SBT - libraryDependencies += com.tuplejump %% calliope_2.10 % 0.9.4-H2-SNAPSHOT It passes all the tests so I am assuming all is fine, but we haven't tested it very extensively. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Fri, Jun 27, 2014 at 9:31 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi Rohit, Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6 and Calliope GA (Would love to try the pre-release version if you want beta testers :-) Our hadoop version is CDH4.4 and of course our spark assembly is compiled against it. We have got really interesting performance results from using Calliope and will probably try to compile it against Hadoop 2. Compared to the DataStax Java driver, out of the box, the Calliope lib gives us ~4.5x insert performance with a higher network and cpu usage (which is what we want in batch insert mode = fast) With additional code optimizations using the DataStax driver, we were able to reduce that gap to 2x but still Calliope was easier and faster to use. Will you be attending the Spark Summit? I'll be around. We'll be in touch in any case :-) -kr, Gerard. On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote: Hi Gerard, What is the version of Spark, Hadoop, Cassandra and Calliope are you using. We never built Calliope to Hadoop2 as we/or our clients don't use Hadoop in their deployments or use it only as the Infra component for Spark in which case H1/H2 doesn't make a difference for them. I know atleast of one case where the user had built Calliope against 2.0 and was using it happily. If you need assistance with it we are here to help. Feel free to reach out to me directly and we can work out a solution for you. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder()
Databricks demo
Hi, Databricks demo at spark summit was amazing...what's the frontend stack used specifically for rendering multiple reactive charts on same dom? Looks like that's an emerging pattern for correlating different data api... Thanks Deb
Re: KMeans code is rubbish
Hi Wanda, As Sean mentioned, K-means is not guaranteed to find an optimal answer, even for seemingly simple toy examples. A common heuristic to deal with this issue is to run kmeans multiple times and choose the best answer. You can do this by changing the runs parameter from the default value (1) to something larger (say 10). -Ameet On Fri, Jul 11, 2014 at 1:20 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: I also took a look at spark-1.0.0/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala and ran the code in a shell. There is an issue here: val initMode = params.initializationMode match { case Random = KMeans.RANDOM case Parallel = KMeans.K_MEANS_PARALLEL } If I use initMode=KMeans.RANDOM everything is ok. If I use initMode=KMeans.K_MEANS_PARALLEL I get a wrong result. I do not know why. The example proposed is a really simple one that should not accept multiple solutions and always converge to the correct one. Now what can be altered in the original SparkKMeans.scala (the seed or something else ?) to get the correct results each and every single time ? On Thursday, July 10, 2014 7:58 PM, Xiangrui Meng men...@gmail.com wrote: SparkKMeans is a naive implementation. Please use mllib.clustering.KMeans in practice. I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your dataset as well, I got the expected answer. And I believe that even though initialization is done using sampling, the example actually sets the seed to a constant 42, so the result should always be the same no matter how many times you run it. So I am not really sure whats going on here. Can you tell us more about which version of Spark you are running? Which Java version? == [tdas @ Xion spark2] cat input 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from SCDynamicStore 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Finished iteration (delta = 3.0) Finished iteration (delta = 0.0) Final centers: DenseVector(5.0, 2.0) DenseVector(2.0, 2.0) On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: so this is what I am running: ./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001 And this is the input file: ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$ └───#!cat ~/Documents/2dim2.txt 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 This is the final output from spark: 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 14 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on localhost (progress: 1/2) 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 15 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on localhost (progress: 2/2) 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at SparkKMeans.scala:75) finished in 0.008 s 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at SparkKMeans.scala:75, took 0.02472681 s Finished iteration (delta = 0.0) Final centers: DenseVector(2.8571428571428568, 2.0)
Re: RDD join, index key: composite keys
I want to do Index similar to RDBMS on keyPnl on the pnl_type_code so that group by can be done efficitently. How do I achieve that? Currently below code blow out of memory in Spark on 60GB of data. keyPnl is very large file. We have been stuck for 1 week. trying kryo, mapvalue etc but without prevail. We want to do partition on pnl_type_code but has no idea how to do that. Please advice. val keyPnl = pnl.filter(_.rf_level == 0).keyBy(f=f.portfolio_code) val keyPosition = positions.filter(_.pl0_code == 3).keyBy(f = f.portfolio_code) val JoinPnlPortfolio = keyPnl.leftOuterJoin(keyPosition) var result = JoinPnlPortfolio.groupBy(r = (r._2._1.pnl_type_code)) .mapValues(kv = (kv.map(mapper).fold (List[Double]()) (Vector.reduceVector _))) .mapValues(kv = (Var.percentile(kv, 0.99))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-join-composite-keys-tp8696p9423.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming with Kafka NoClassDefFoundError
You may try to use this one: https://github.com/sbt/sbt-assembly I had an issue of duplicate files in the uber jar file. But I think this library will assemble dependencies into a single jar file. Bill On Fri, Jul 11, 2014 at 1:34 AM, Dilip dilip_ram...@hotmail.com wrote: A simple sbt assembly is not working. Is there any other way to include particular jars with assembly command? Regards, Dilip On Friday 11 July 2014 12:45 PM, Bill Jay wrote: I have met similar issues. The reason is probably because in Spark assembly, spark-streaming-kafka is not included. Currently, I am using Maven to generate a shaded package with all the dependencies. You may try to use sbt assembly to include the dependencies in your jar file. Bill On Thu, Jul 10, 2014 at 11:48 PM, Dilip dilip_ram...@hotmail.com wrote: Hi Akhil, Can you please guide me through this? Because the code I am running already has this in it: [java] SparkContext sc = new SparkContext(); sc.addJar(/usr/local/spark/external/kafka/target/scala-2.10/spark-streaming-kafka_2.10-1.1.0-SNAPSHOT.jar); Is there something I am missing? Thanks, Dilip On Friday 11 July 2014 12:02 PM, Akhil Das wrote: Easiest fix would be adding the kafka jars to the SparkContext while creating it. Thanks Best Regards On Fri, Jul 11, 2014 at 4:39 AM, Dilip dilip_ram...@hotmail.com wrote: Hi, I am trying to run a program with spark streaming using Kafka on a stand alone system. These are my details: Spark 1.0.0 hadoop2 Scala 2.10.3 I am trying a simple program using my custom sbt project but this is the error I am getting: Exception in thread main java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:55) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:94) at org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala) at SimpleJavaApp.main(SimpleJavaApp.java:40) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 11 more here is my .sbt file: name := Simple Project version := 1.0 scalaVersion := 2.10.3 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-streaming % 1.0.0 libraryDependencies += org.apache.spark %% spark-sql % 1.0.0 libraryDependencies += org.apache.spark %% spark-examples % 1.0.0 libraryDependencies += org.apache.spark % spark-streaming-kafka_2.10 % 1.0.0 libraryDependencies += org.apache.kafka %% kafka % 0.8.0 resolvers += Akka Repository at http://repo.akka.io/releases/; resolvers += Maven Repository at http://central.maven.org/maven2/; sbt package was successful. I also tried sbt ++2.10.3 package to build it for my scala version. Problem remains the same. Can anyone help me out here? Ive been stuck on this for quite some time now. Thank You, Dilip
RE: SPARK_CLASSPATH Warning
As mentioned, deprecated in Spark 1.0+. Try to use the --driver-class-path: ./bin/spark-shell --driver-class-path yourlib.jar:abc.jar:xyz.jar Don't use glob *, specify the JAR one by one with colon. Date: Wed, 9 Jul 2014 13:45:07 -0700 From: kat...@cs.pitt.edu Subject: SPARK_CLASSPATH Warning To: user@spark.apache.org Hello, I have installed Apache Spark v1.0.0 in a machine with a proprietary Hadoop Distribution installed (v2.2.0 without yarn). Due to the fact that the Hadoop Distribution that I am using, uses a list of jars , I do the following changes to the conf/spark-env.sh #!/usr/bin/env bash export HADOOP_CONF_DIR=/path-to-hadoop-conf/hadoop-conf export SPARK_LOCAL_IP=impl41 export SPARK_CLASSPATH=/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/* ... Also, to make sure that I have everything working I execute the Spark shell as follows: [biadmin@impl41 spark]$ ./bin/spark-shell --jars /path-to-proprietary-hadoop-lib/lib/*.jar 14/07/09 13:37:28 INFO spark.SecurityManager: Changing view acls to: biadmin 14/07/09 13:37:28 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(biadmin) 14/07/09 13:37:28 INFO spark.HttpServer: Starting HTTP Server 14/07/09 13:37:29 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/09 13:37:29 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44292 Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.0.0 /_/ Using Scala version 2.10.4 (IBM J9 VM, Java 1.7.0) Type in expressions to have them evaluated. Type :help for more information. 14/07/09 13:37:36 WARN spark.SparkConf: SPARK_CLASSPATH was detected (set to 'path-to-proprietary-hadoop-lib/*:/path-to-proprietary-hadoop-lib/lib/*'). This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath 14/07/09 13:37:36 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*' as a work-around. 14/07/09 13:37:36 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/path-to-proprietary-hadoop-lib/lib/*:/path-to-proprietary-hadoop-lib/*' as a work-around. 14/07/09 13:37:36 INFO spark.SecurityManager: Changing view acls to: biadmin 14/07/09 13:37:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(biadmin) 14/07/09 13:37:37 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/07/09 13:37:37 INFO Remoting: Starting remoting 14/07/09 13:37:37 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@impl41:46081] 14/07/09 13:37:37 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@impl41:46081] 14/07/09 13:37:37 INFO spark.SparkEnv: Registering MapOutputTracker 14/07/09 13:37:37 INFO spark.SparkEnv: Registering BlockManagerMaster 14/07/09 13:37:37 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140709133737-798b 14/07/09 13:37:37 INFO storage.MemoryStore: MemoryStore started with capacity 307.2 MB. 14/07/09 13:37:38 INFO network.ConnectionManager: Bound socket to port 16685 with id = ConnectionManagerId(impl41,16685) 14/07/09 13:37:38 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/07/09 13:37:38 INFO storage.BlockManagerInfo: Registering block manager impl41:16685 with 307.2 MB RAM 14/07/09 13:37:38 INFO storage.BlockManagerMaster: Registered BlockManager 14/07/09 13:37:38 INFO spark.HttpServer: Starting HTTP Server 14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/09 13:37:38 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:21938 14/07/09 13:37:38 INFO broadcast.HttpBroadcast: Broadcast server started at http://impl41:21938 14/07/09 13:37:38 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-91e8e040-f2ca-43dd-b574-805033f476c7 14/07/09 13:37:38 INFO spark.HttpServer: Starting HTTP Server 14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/09 13:37:38 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52678 14/07/09 13:37:38 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/09 13:37:38 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/07/09 13:37:38 INFO ui.SparkUI: Started SparkUI at http://impl41:4040 14/07/09 13:37:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/09 13:37:39 INFO spark.SparkContext: Added JAR file:/opt/ibm/biginsights/IHC/lib/adaptive-mr.jar at http://impl41:52678/jars/adaptive-mr.jar with timestamp 1404938259526 14/07/09 13:37:39 INFO executor.Executor: Using REPL class URI: http://impl41:44292 14/07/09
Re: Join two Spark Streaming
1. Since the RDD of the previous batch is used to create the RDD of the next batch, the lineage of dependencies in the RDDs continues to grow infinitely. Thats not good because of it increases fault-recover times, task sizes, etc. Checkpointing saves the data of an RDD to HDFS and truncates the lineage. 2. The code should have been the following. Sorry about the confusion. var uniqueValuesRDD: RDD[Int] = ... dstreamOfIntegers.transform(newDataRDD = { val newUniqueValuesRDD = newDataRDD.union(*uniqueValuesRDD*).distinct uniqueValuesRDD = newUniqueValuesRDD // periodically call uniqueValuesRDD.checkpoint() val uniqueCount = uniqueValuesRDD.count() newDataRDD.map(x = x / count) }) On Fri, Jul 11, 2014 at 12:10 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Thanks for the solution. Actually, I will use the number of unique integers in the batch instead of accumulative number of unique integers. I do have two questions about your code: 1. Why do we need uniqueValuesRDD? Why do we need to call uniqueValuesRDD.checkpoint()? 2. Where is distinctValues defined? Bill On Thu, Jul 10, 2014 at 8:46 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you want to continuously maintain the set of unique integers seen since the beginning of stream? var uniqueValuesRDD: RDD[Int] = ... dstreamOfIntegers.transform(newDataRDD = { val newUniqueValuesRDD = newDataRDD.union(distinctValues).distinct uniqueValuesRDD = newUniqueValuesRDD // periodically call uniqueValuesRDD.checkpoint() val uniqueCount = uniqueValuesRDD.count() newDataRDD.map(x = x / count) }) On Tue, Jul 8, 2014 at 11:03 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I am working on a pipeline that needs to join two Spark streams. The input is a stream of integers. And the output is the number of integer's appearance divided by the total number of unique integers. Suppose the input is: 1 2 3 1 2 2 There are 3 unique integers and 1 appears twice. Therefore, the output for the integer 1 will be: 1 0.67 Since the input is from a stream, it seems we need to first join the appearance of the integers and the total number of unique integers and then do a calculation using map. I am thinking of adding a dummy key to both streams and use join. However, a Cartesian product matches the application here better. How to do this effectively? Thanks! Bill
RE: spark-1.0.0-rc11 2f1dc868 spark-shell not honoring --properties-file option?
Ok, I found it on JIRA SPARK-2390: https://issues.apache.org/jira/browse/SPARK-2390 So it looks like this is a known issue. From: alee...@hotmail.com To: user@spark.apache.org Subject: spark-1.0.0-rc11 2f1dc868 spark-shell not honoring --properties-file option? Date: Tue, 8 Jul 2014 15:17:00 -0700 Build: Spark 1.0.0 rc11 (git commit tag: 2f1dc868e5714882cf40d2633fb66772baf34789) Hi All, When I enabled the spark-defaults.conf for the eventLog, spark-shell broke while spark-submit works. I'm trying to create a separate directory per user to keep track with their own Spark job event logs with the env $USER in spark-defaults.conf. Here's the spark-defaults.conf I specified so that HistoryServer can start picking up these event log from HDFS.As you can see here, I was trying to create a directory for each user so they can store the event log on a per user base.However, when I launch spark-shell, it didn't pick up $USER as the current login user. However, this works for spark-submit. Here's more details. /opt/spark/ is SPARK_HOME [test@ ~]$ cat /opt/spark/conf/spark-defaults.conf # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # Example: # spark.masterspark://master:7077 spark.eventLog.enabledtrue spark.eventLog.dirhdfs:///user/$USER/spark/logs/ # spark.serializerorg.apache.spark.serializer.KryoSerializer and I tried to create a separate config file to override the default one: [test@ ~]$ SPARK_SUBMIT_OPTS=-XX:MaxPermSize=256m /opt/spark/bin/spark-shell --master yarn --driver-class-path /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar --properties-file /home/test/spark-defaults.conf [test@~]$ cat /home/test/spark-defaults.conf# Default system properties included when running spark-submit.# This is useful for setting default environmental settings. # Example:# spark.masterspark://master:7077spark.eventLog.enabled truespark.eventLog.dirhdfs:///user/test/spark/logs/ # spark.serializerorg.apache.spark.serializer.KryoSerializer But it didn't work also, it is still looking at the /opt/spark/conf/spark-defaults.conf. According to the document, http://spark.apache.org/docs/latest/configuration.htmlHardcoded properties in SparkConf spark-submit / spark-shell conf/spark-defaults.conf 2 problems here: 1. In repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala, the instance SparkConf didn't look for the user specified spark-defaults.conf anywhere. I don't see anywhere that pulls in the file from option --properties-file, it is just the default location conf/spark-defaults.confval conf = new SparkConf() .setMaster(getMaster()) .setAppName(Spark shell) .setJars(jars) .set(spark.repl.class.uri, intp.classServer.uri) 2. The $USER isn't picked up in spark-shell. This may be another problem and fixed at the same time when it re-use how SparkSubmit.scala does to SparkConf???
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: - Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. - If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: writing FLume data to HDFS
What is the error you are getting when you say ??I was trying to write the data to hdfs..but it fails… TD On Thu, Jul 10, 2014 at 1:36 PM, Sundaram, Muthu X. muthu.x.sundaram@sabre.com wrote: I am new to spark. I am trying to do the following. NetcatàFlumeàSpark streaming(process Flume Data)àHDFS. My flume config file has following set up. Source = netcat Sink=avrosink. Spark Streaming code: I am able to print data from flume to the monitor. But I am struggling to create a file. In order to get the real data I need to convert SparkEvent to avroEvent. JavaRDD.saveAsText()àmight not work..because JavaRDD is collection of SparkEvent..Do I need to convert this in to collection of JavaRDDAvroEvent? Please share any code examples… Thanks. Code: Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName(JavaFlumeEventCount); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaDStreamSparkFlumeEvent flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); flumeStream.foreachRDD(new Function2JavaRDDSparkFlumeEvent,JavaRDDSparkFlumeEvent,Void(){ @Override public Void call(JavaRDDSparkFlumeEvent events1,JavaRDDSparkFlumeEvent events2) throws Exception{ events1.saveasTextFile(output.txt); return null; } }); /*flumeStream.count().map(new FunctionLong, String() { @Override public String call(Long in) { return Received + in + flume events.; } }).print();*/ flumeStream.foreach(new FunctionJavaRDDSparkFlumeEvent,Void () { @Override public Void call(JavaRDDSparkFlumeEvent eventsData) throws Exception { String logRecord = null; ListSparkFlumeEvent events = eventsData.collect(); IteratorSparkFlumeEvent batchedEvents = events.iterator(); long t1 = System.currentTimeMillis(); AvroFlumeEvent avroEvent = null; ByteBuffer bytePayload = null; // All the user level data is carried as payload in Flume Event while(batchedEvents.hasNext()) { SparkFlumeEvent flumeEvent = batchedEvents.next(); avroEvent = flumeEvent.event(); bytePayload = avroEvent.getBody(); logRecord = new String(bytePayload.array()); System.out.println(LOG RECORD = + logRecord); ??I was trying to write the data to hdfs..but it fails… } System.out.println(Processed this batch in: + (System.currentTimeMillis() - t1)/1000 + seconds); return null; } });
Re: Spark Streaming RDD to Shark table
Hi, I'm running into an identical issue running Spark 1.0.0 on Mesos 0.19. Were you able to get it sorted? There's no real documentation for the spark.httpBroadcast.uri except what's in the code - is this config setting required for running on a Mesos cluster? I'm running this in a dev environment with a simple 2 machine setup - the driver is running on dev-1, and dev-2 (10.0.0.5 in the below stack trace) has a mesos master, zookeeper, and mesos slave. Stack Trace: 14/07/11 18:00:05 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@dev-1:58136/user/MapOutputTracker 14/07/11 18:00:06 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@dev-1:58136/user/BlockManagerMaster 14/07/11 18:00:06 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140711180006-dea8 14/07/11 18:00:06 INFO MemoryStore: MemoryStore started with capacity 589.2 MB. 14/07/11 18:00:06 INFO ConnectionManager: Bound socket to port 60708 with id = ConnectionManagerId(10.0.0.5,60708) 14/07/11 18:00:06 INFO BlockManagerMaster: Trying to register BlockManager 14/07/11 18:00:06 INFO BlockManagerMaster: Registered BlockManager java.util.NoSuchElementException: spark.httpBroadcast.uri at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149) at org.apache.spark.SparkConf$$anonfun$get$1.apply(SparkConf.scala:149) at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) at scala.collection.AbstractMap.getOrElse(Map.scala:58) at org.apache.spark.SparkConf.get(SparkConf.scala:149) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:130) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.init(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.executor.Executor.init(Executor.scala:85) at org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:56) Exception in thread Thread-2 I0711 18:00:06.454962 14037 exec.cpp:412] Deactivating the executor libprocess If I manually set the httpBroadcastUri to http://dev-1; I get the following error, I assume because I'm not setting the port correctly (which I don't think I have any way of knowing?) 14/07/11 18:31:27 ERROR Executor: Exception in task ID 4 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at sun.net.NetworkClient.doConnect(NetworkClient.java:180) at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) at sun.net.www.http.HttpClient.init(HttpClient.java:211) at sun.net.www.http.HttpClient.New(HttpClient.java:308) at sun.net.www.http.HttpClient.New(HttpClient.java:326) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:996) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:932) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:850) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1300) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
So, is it expected for the process to generate stages/tasks even after processing a file ? Also, is there a way to figure out the file that is getting processed and when that process is complete ? Thanks On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: * Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. * If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Top N predictions
Hello all, In our use case we would like to return top 10 predicted values. I've looked at NaiveBayes LogisticRegressionModel and cannot seem to find a way to get the predicted values for a vector - is this possible with mllib/spark? Thanks, Rich
Re: Top N predictions
I don't believe it is. Recently when I needed to do this, I just copied out the underlying probability / margin function and calculated it from the model params. It's just a dot product. On Fri, Jul 11, 2014 at 7:48 PM, Rich Kroll rich.kr...@modernizingmedicine.com wrote: Hello all, In our use case we would like to return top 10 predicted values. I've looked at NaiveBayes LogisticRegressionModel and cannot seem to find a way to get the predicted values for a vector - is this possible with mllib/spark? Thanks, Rich
Job getting killed
I am trying to run Logistic Regression on the url dataset (from libsvm) using the exact same code as the example on a 5 node Yarn-Cluster. I get a pretty cryptic error that says Killed Nothing more Settings: --master yarn-client --verbose --driver-memory 24G --executor-memory 24G --executor-cores 8 --num-executors 5 I set the akka.frame_size to 200MB. Script: ef main(args: Array[String]) { val conf = new SparkConf() .setMaster(yarn-client) .setAppName(Logistic regression SGD fixed) .set(spark.akka.frameSize, 200) var sc = new SparkContext(conf) // Load and parse the data val dataset = args(0) val maxIterations = 100 val start_time = System.nanoTime() val data = MLUtils.loadLibSVMFile(sc, dataset) // Building the model var solver = new LogisticRegressionWithSGD() solver.optimizer.setNumIterations(maxIterations) solver.optimizer.setRegParam(0.01) val model = solver.run(data) // Measure the accuracy. Don't measure the time taken to do this. val preditionsAndLabels = data.map { point = val prediction = model.predict(point.features) (prediction, point.label) } val accuracy = (preditionsAndLabels.filter(r = r._1 == r._2).count.toDouble) / data.count val elapsed_time = (System.nanoTime() - start_time) / 1e9 // User the last known accuracy println(dataset + ,spark-sgd, + maxIterations + , + elapsed_time + , + accuracy) System.exit(0) }
MLlib feature request
Hi all, My company is actively using spark machine learning library, and we would love to see Gradient Boosting Machine algorithm (and perhaps Adaboost algorithm as well) being implemented. I’d greatly appreciate it if anyone could help to move it forward or to elevate this request. Thanks, Joseph
Re: KMeans for large training data
On Fri, Jul 11, 2014 at 7:32 PM, durin m...@simon-schaefer.net wrote: How would you get more partitions? You can specify this as the second arg to methods that read your data originally, like: sc.textFile(..., 20) I ran broadcastVector.value.repartition(5), but broadcastVector.value.partitions.size is still 1 and no change to the behavior is visible. These are immutable, so to have effect you have to do something like: val repartitioned = broadcastVector.value.repartition(5) First of all, there is a gap of almost two minutes between the third to last and second to last line, where no activity is shown in the WebUI. Is that the GC at work? If yes, how would I improve this? You mean there are a few minutes where no job is running? I assume that's time when the driver is busy doing something. Is it thrashing? Also, Local KMeans++ reached the max number of iterations: 30 surprises me. I have ran training using is it possible that somehow, there are still 30 iterations executed, despite of the 3 I set? Are you sure you set 3 iterations?
Re: Recommended pipeline automation tool? Oozie?
Just curious: how about using scala to drive the workflow? I guess if you use other tools (oozie, etc) you lose the advantage of reading from RDD -- you have to read from HDFS. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: k.tham kevins...@gmail.com To: u...@spark.incubator.apache.org, Date: 07/10/2014 01:20 PM Subject:Recommended pipeline automation tool? Oozie? I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: MLlib feature request
Hi Joseph, Thanks for your email. Many users are requesting this functionality, while it would be a stretch for them to appear in Spark 1.1, various people (including Manish Amde and folks at the AMPLab, Databricks and Alpine Labs) are actively work on developing ensembles of decision trees (random forests, boosting). -Ameet On Fri, Jul 11, 2014 at 12:04 PM, Joseph Feng joseph.f...@creditkarma.com wrote: Hi all, My company is actively using spark machine learning library, and we would love to see Gradient Boosting Machine algorithm (and perhaps Adaboost algorithm as well) being implemented. I’d greatly appreciate it if anyone could help to move it forward or to elevate this request. Thanks, Joseph
not getting output from socket connection
Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: not getting output from socket connection
I forgot to add that I get the same behavior if I tail -f | nc localhost on a log file. On Fri, Jul 11, 2014 at 1:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Re: not getting output from socket connection
netcat is listening for a connection on port . It is echoing what you type to its console to anything that connects to and reads. That is what Spark streaming does. If you yourself connect to and write, nothing happens except that netcat echoes it. This does not cause Spark to somehow get that data. nc is only echoing input from the console. On Fri, Jul 11, 2014 at 9:25 PM, Walrus theCat walrusthe...@gmail.com wrote: Hi, I have a java application that is outputting a string every second. I'm running the wordcount example that comes with Spark 1.0, and running nc -lk . When I type words into the terminal running netcat, I get counts. However, when I write the String onto a socket on port , I don't get counts. I can see the strings showing up in the netcat terminal, but no counts from Spark. If I paste in the string, I get counts. Any ideas? Thanks
Spark Questions
Hi all, We've been evaluating Spark for a long-term project. Although we've been reading several topics in forum, any hints on the following topics we'll be extremely welcomed: 1. Which are the data partition strategies available in Spark? How configurable are these strategies? 2. How would be the best way to use Spark if queries can touch only 3-5 entries/records? Which strategy is the best if they want to perform a full scan of the entries? 3. Is Spark capable of interacting with RDBMS? Thanks a lot! Best regards, -- *Gonzalo Zarza* | PhD in High-Performance Computing | Big-Data Specialist | *GLOBANT* | AR: +54 11 4109 1700 ext. 15494 | US: +1 877 215 5230 ext. 15494 | [image: Facebook] https://www.facebook.com/Globant [image: Twitter] http://www.twitter.com/globant [image: Youtube] http://www.youtube.com/Globant [image: Linkedin] http://www.linkedin.com/company/globant [image: Pinterest] http://pinterest.com/globant/ [image: Globant] http://www.globant.com/
Re: Spark streaming - tasks and stages continue to be generated when using reduce by key
The model for file stream is to pick up and process new files written atomically (by move) into a directory. So your file is being processed in a single batch, and then its waiting for any new files to be written into that directory. TD On Fri, Jul 11, 2014 at 11:46 AM, M Singh mans6si...@yahoo.com wrote: So, is it expected for the process to generate stages/tasks even after processing a file ? Also, is there a way to figure out the file that is getting processed and when that process is complete ? Thanks On Friday, July 11, 2014 1:51 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whenever you need to do a shuffle=based operation like reduceByKey, groupByKey, join, etc., the system is essentially redistributing the data across the cluster and it needs to know how many parts should it divide the data into. Thats where the default parallelism is used. TD On Fri, Jul 11, 2014 at 3:16 AM, M Singh mans6si...@yahoo.com wrote: Hi TD: The input file is on hdfs. The file is approx 2.7 GB and when the process starts, there are 11 tasks (since hdfs block size is 256M) for processing and 2 tasks for reduce by key. After the file has been processed, I see new stages with 2 tasks that continue to be generated. I understand this value (2) is the default value for spark.default.parallelism but don't quite understand how is the value determined for generating tasks for reduceByKey, how is it used besides reduceByKey and what should be the optimal value for this. Thanks. On Thursday, July 10, 2014 7:24 PM, Tathagata Das tathagata.das1...@gmail.com wrote: How are you supplying the text file? On Wed, Jul 9, 2014 at 11:51 AM, M Singh mans6si...@yahoo.com wrote: Hi Folks: I am working on an application which uses spark streaming (version 1.1.0 snapshot on a standalone cluster) to process text file and save counters in cassandra based on fields in each row. I am testing the application in two modes: - Process each row and save the counter in cassandra. In this scenario after the text file has been consumed, there is no task/stages seen in the spark UI. - If instead I use reduce by key before saving to cassandra, the spark UI shows continuous generation of tasks/stages even after processing the file has been completed. I believe this is because the reduce by key requires merging of data from different partitions. But I was wondering if anyone has any insights/pointers for understanding this difference in behavior and how to avoid generating tasks/stages when there is no data (new file) available. Thanks Mans
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
The same executor can be used for both receiving and processing, irrespective of the deployment mode (yarn, spark standalone, etc.) It boils down to the number of cores / task slots that executor has. Each receiver is like a long running task, so each of them occupy a slot. If there are free slots in the executor then other tasks can be run on them. So if you are finding that the other tasks are being run, check how many cores/task slots the executor has and whether there are more task slots than the number of input dstream / receivers you are launching. @Praveen your answers were pretty much spot on, thanks for chipping in! On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote: Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Number of executors change during job running
Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: executor failed, cannot find compute-classpath.sh
Hi CJ, Looks like I overlook a few lines in the spark shell case. It appears that spark shell explicitly overwrites https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L955 spark.home to whatever SPARK_HOME is set to. I have filed a JIRA to track this issue: https://issues.apache.org/jira/browse/SPARK-2454. Until then, you can workaround this by ensuring we don't set SPARK_HOME anywhere. We currently do this in bin/spark-submit and bin/spark-class, so go ahead and remove these lines: https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-submit#L20 https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-class#L31 In addition, make sure spark-submit no longer uses the SPARK_HOME variable here: https://github.com/apache/spark/blob/f4f46dec5ae1da48738b9b650d3de155b59c4674/bin/spark-submit#L44 Now, as you have done before, setting spark.home to your executor's spark home should do the job. I have verified that this solves the problem in my own cluster. To verify that your configs are in fact set, you can always run bin/spark-submit (or spark-shell, which calls spark-submit) with the --verbose flag. Let me know if this fixes it. I will get to fixing the root problem soon. Andrew 2014-07-10 18:43 GMT-07:00 cjwang c...@cjwang.us: Andrew, Thanks for replying. I did the following and the result was still the same. 1. Added spark.home /root/spark-1.0.0 to local conf/spark-defaults.conf, where /root was the place in the cluster where I put Spark. 2. Ran bin/spark-shell --master spark://sjc1-eng-float01.carrieriq.com:7077. 3. Sighed when I still saw the same error: 14/07/10 18:26:53 INFO AppClient$ClientActor: Executor updated: app-20140711012651-0007/5 is now FAILED (class java.io.IOException: Cannot run program /Users/cwang/spark/bin/compute-classpath.sh (in directory .): error=2, No such file or directory) /Users/cwang/spark was my local SPARK_HOME, which is wrong. What did I do wrong? How do I know if the config file is taken? I am novice to Spark so spare with me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-failed-cannot-find-compute-classpath-sh-tp859p9378.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Hi Tathagata, Thank you. Is task slot equivalent to the core number? Or actually one core can run multiple tasks at the same time? Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The same executor can be used for both receiving and processing, irrespective of the deployment mode (yarn, spark standalone, etc.) It boils down to the number of cores / task slots that executor has. Each receiver is like a long running task, so each of them occupy a slot. If there are free slots in the executor then other tasks can be run on them. So if you are finding that the other tasks are being run, check how many cores/task slots the executor has and whether there are more task slots than the number of input dstream / receivers you are launching. @Praveen your answers were pretty much spot on, thanks for chipping in! On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote: Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
pyspark sc.parallelize running OOM with smallish data
spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in the cluster and gave 48g to executors. also tried kyro serialization. traceback (most recent call last): File /mohit/./m.py, line 58, in module spark_data = sc.parallelize(spark_data_array) File /mohit/spark/python/pyspark/context.py, line 265, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Number of executors change during job running
Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage
Graphx : optimal partitions for a graph and error in logs
Hi, I am trying graphx on live journal data. I have a cluster of 17 computing nodes, 1 master and 16 workers. I had few questions about this. * I built spark from spark-master (to avoid partitionBy error of spark 1.0). * I am using edgeFileList() to load data and I figured I need to provide partitions I want. the exact syntax I am using is following val graph = GraphLoader.edgeListFile(sc, filepath,true,64).partitionBy(PartitionStrategy.RandomVertexCut) -- Is it a correct way to load file to get best performance? -- What should be the partition size? =computing node or =cores? -- I see following error so many times in my logs, ERROR BlockManagerWorker: Exception handling buffer message java.io.NotSerializableException: org.apache.spark.graphx.impl.ShippableVertexPartition Does it suggest that my graph wasn't partitioned properly? I suspect it affects performance ? Please suggest whether I'm following every step (correctly) Thanks in advance, -Shreyansh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to separate a subset of an RDD by day?
If you are on 1.0.0 release you can also try converting your RDD to a SchemaRDD and run a groupBy there. The SparkSQL optimizer may yield better results. It's worth a try at least. On Fri, Jul 11, 2014 at 5:24 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Solution 2 is to map the objects into a pair RDD where the key is the number of the day in the interval, then group by key, collect, and parallelize the resulting grouped data. However, I worry collecting large data sets is going to be a serious performance bottleneck. Why do you have to do a collect ? You can do a groupBy and then write the grouped data to disk again
Re: How to separate a subset of an RDD by day?
ssimanta wrote Solution 2 is to map the objects into a pair RDD where the key is the number of the day in the interval, then group by key, collect, and parallelize the resulting grouped data. However, I worry collecting large data sets is going to be a serious performance bottleneck. Why do you have to do a collect ? You can do a groupBy and then write the grouped data to disk again I want to process the resulting data sets as RDD's, and groupBy only returns the data as Seq. Thanks on the idea to write the grouped data back to disk. I think my best option is to partition my data in directories by day before running my Spark application, and then direct my Spark application to load RDD's from each directory when I want to load a date range. How does this sound? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454p9459.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Graphx : optimal partitions for a graph and error in logs
On Fri, Jul 11, 2014 at 2:23 PM, ShreyanshB shreyanshpbh...@gmail.com wrote: -- Is it a correct way to load file to get best performance? Yes, edgeListFile should be efficient at loading the edges. -- What should be the partition size? =computing node or =cores? In general it should be a multiple of the number of cores to exploit all available parallelism, but because of shuffle overhead, it might help to use fewer partitions -- in some cases even fewer than the number of cores. You can measure the performance with different numbers of partitions to see what is best. -- I see following error so many times in my logs [...] NotSerializableException This is a known bug, and there are two possible resolutions: 1. Switch from Java serialization to Kryo serialization, which is faster and will also resolve the problem, by setting the following Spark properties in conf/spark-defaults.conf: spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrator org.apache.spark.graphx.GraphKryoRegistrator 2. Mark the affected classes as Serializable. I'll submit a patch with this fix as well, but for now I'd suggest trying Kryo if possible. Ankur http://www.ankurdave.com/
Re: Graphx : optimal partitions for a graph and error in logs
Thanks a lot Ankur, I'll follow that. A last quick Does that error affect performance? ~Shreyansh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9462.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to separate a subset of an RDD by day?
I think my best option is to partition my data in directories by day before running my Spark application, and then direct my Spark application to load RDD's from each directory when I want to load a date range. How does this sound? If your upstream system can write data by day then it makes perfect sense to do that and load (into Spark) only the data that is required for processing. This also saves you the filter step and hopefully time and memory. If you want to get back the bigger dataset you can always join multiple days of data (RDDs) together.
Re: How to separate a subset of an RDD by day?
Sean Owen-2 wrote Can you not just filter the range you want, then groupBy timestamp/86400 ? That sounds like your solution 1 and is about as fast as it gets, I think. Are you thinking you would have to filter out each day individually from there, and that's why it would be slow? I don't think that's needed. You also don't need to map to pairs. I didn't make it clear in my first message that I want to obtain an RDD instead of an Iterable, and will be doing map-reduce like operations on the data by day. My problem is that groupBy returns an RDD[(K, Iterable[T])], but I really want an RDD[(K, RDD[T])]. Is there a better approach to this? I'm leaning towards partitioning my data by day on disk since all of my queries will always process data per day. However, the only problem I see with partitioning the data on disk is that it limits my system to cleanly work for a single timezone. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-separate-a-subset-of-an-RDD-by-day-tp9454p9464.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Streaming training@ Spark Summit 2014
Hi, I tried out the streaming program on the Spark training web page. I created a Twitter app as per the instructions (pointing to http://www.twitter.com). When I run the program, my credentials get printed out correctly but thereafter, my program just keeps waiting. It does not print out the hashtag count etc. My code appears below (essentially same as what is on the training web page). I would like to know why I am not able to get a continuous stream and the hashtag count. thanks // relevant code snippet TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret) val ssc = new StreamingContext(new SparkConf(), Seconds(1)) val tweets = TwitterUtils.createStream(ssc, None) val statuses = tweets.map(status = status.getText()) statuses.print() ssc.checkpoint(checkpointDir) val words = statuses.flatMap(status = status.split( )) val hashtags = words.filter(word = word.startsWith(#)) hashtags.print() val counts = hashtags.map(tag = (tag, 1)) .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(1)) counts.print() val sortedCounts = counts.map { case(tag, count) = (count, tag) } .transform(rdd = rdd.sortByKey(false)) sortedCounts.foreach(rdd = println(\nTop 10 hashtags:\n + rdd.take(10).mkString(\n))) ssc.start() ssc.awaitTermination() //end code snippet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to separate a subset of an RDD by day?
On Fri, Jul 11, 2014 at 10:53 PM, bdamos a...@adobe.com wrote: I didn't make it clear in my first message that I want to obtain an RDD instead of an Iterable, and will be doing map-reduce like operations on the data by day. My problem is that groupBy returns an RDD[(K, Iterable[T])], but I really want an RDD[(K, RDD[T])]. Is there a better approach to this? Yeah, you can't have an RDD of RDDs. Why does it need to be an RDD -- because a day could have a huge amount of data? because Scala collections have map and reduce methods and the like too. I think that if you really want RDDs you can just make a series of them, with some code like (start/86400 to end/86400).map(day = (day, rdd.filter(rec = rec.time = day*86400 rec.time (day+1)*86400))) I think that's your solution 1. I don't imagine it's that bad if this is what you need to do.
Re: Graphx : optimal partitions for a graph and error in logs
I don't think it should affect performance very much, because GraphX doesn't serialize ShippableVertexPartition in the fast path of mapReduceTriplets execution (instead it calls ShippableVertexPartition.shipVertexAttributes and serializes the result). I think it should only get serialized for speculative execution, if you have that enabled. By the way, here's the fix: https://github.com/apache/spark/pull/1376 Ankur http://www.ankurdave.com/
Re: Recommended pipeline automation tool? Oozie?
I like the idea of using scala to drive the workflow. Spark already comes with a scheduler, why not program a plugin to schedule other types of tasks (copy file, send email, etc.)? Scala could handle any logic required by the pipeline. Passing objects (including RDDs) between tasks is also easier. I don't know if this is an overuse of Spark scheduler, but sounds like a good tool. The only issue would be releasing resources that is not used at intermediate steps. On Fri, Jul 11, 2014 at 12:05 PM, Wei Tan w...@us.ibm.com wrote: Just curious: how about using scala to drive the workflow? I guess if you use other tools (oozie, etc) you lose the advantage of reading from RDD -- you have to read from HDFS. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan From:k.tham kevins...@gmail.com To:u...@spark.incubator.apache.org, Date:07/10/2014 01:20 PM Subject:Recommended pipeline automation tool? Oozie? -- I'm just wondering what's the general recommendation for data pipeline automation. Say, I want to run Spark Job A, then B, then invoke script C, then do D, and if D fails, do E, and if Job A fails, send email F, etc... It looks like Oozie might be the best choice. But I'd like some advice/suggestions. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Li @vrilleup
Re: confirm subscribe to user@spark.apache.org
On Fri, Jul 11, 2014 at 3:11 PM, user-h...@spark.apache.org wrote: Hi! This is the ezmlm program. I'm managing the user@spark.apache.org mailing list. To confirm that you would like veera...@gmail.com added to the user mailing list, please send a short reply to this address: user-sc.1405116686.kijenhjamnjaodhflpgc-veeran54= gmail@spark.apache.org Usually, this happens when you just hit the reply button. If this does not work, simply copy the address and paste it into the To: field of a new message. This confirmation serves two purposes. First, it verifies that I am able to get mail through to you. Second, it protects you in case someone forges a subscription request in your name. Please note that ALL Apache dev- and user- mailing lists are publicly archived. Do familiarize yourself with Apache's public archive policy at http://www.apache.org/foundation/public-archives.html prior to subscribing and posting messages to user@spark.apache.org. If you're not sure whether or not the policy applies to this mailing list, assume it does unless the list name contains the word private in it. Some mail programs are broken and cannot handle long addresses. If you cannot reply to this request, instead send a message to user-requ...@spark.apache.org and put the entire address listed above into the Subject: line. --- Administrative commands for the user list --- I can handle administrative requests automatically. Please do not send them to the list address! Instead, send your message to the correct command address: To subscribe to the list, send a message to: user-subscr...@spark.apache.org To remove your address from the list, send a message to: user-unsubscr...@spark.apache.org Send mail to the following for info and FAQ for this list: user-i...@spark.apache.org user-...@spark.apache.org Similar addresses exist for the digest list: user-digest-subscr...@spark.apache.org user-digest-unsubscr...@spark.apache.org To get messages 123 through 145 (a maximum of 100 per request), mail: user-get.123_...@spark.apache.org To get an index with subject and author for messages 123-456 , mail: user-index.123_...@spark.apache.org They are always returned as sets of 100, max 2000 per request, so you'll actually get 100-499. To receive all messages with the same subject as message 12345, send a short message to: user-thread.12...@spark.apache.org The messages should contain one line or word of text to avoid being treated as sp@m, but I will ignore their content. Only the ADDRESS you send to is important. You can start a subscription for an alternate address, for example john@host.domain, just add a hyphen and your address (with '=' instead of '@') after the command word: user-subscribe-john=host.dom...@spark.apache.org To stop subscription for this address, mail: user-unsubscribe-john=host.dom...@spark.apache.org In both cases, I'll send a confirmation message to that address. When you receive it, simply reply to it to complete your subscription. If despite following these instructions, you do not get the desired results, please contact my owner at user-ow...@spark.apache.org. Please be patient, my owner is a lot slower than I am ;-) --- Enclosed is a copy of the request I received. Return-Path: veera...@gmail.com Received: (qmail 70277 invoked by uid 99); 11 Jul 2014 22:11:26 - Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:26 + X-ASF-Spam-Status: No, hits=-0.3 required=10.0 tests=ASF_LIST_OPS,FREEMAIL_ENVFROM_END_DIGIT,HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of veera...@gmail.com designates 209.85.212.173 as permitted sender) Received: from [209.85.212.173] (HELO mail-wi0-f173.google.com) (209.85.212.173) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Jul 2014 22:11:20 + Received: by mail-wi0-f173.google.com with SMTP id cc10so349270wib.6 for user-subscr...@spark.apache.org; Fri, 11 Jul 2014 15:10:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=kidlw3R7uWQaPspPvOk8WJFI36NQFLw02hzB1Mp9UVc=; b=WiLScFUuJZYgoF7St7OB4FdLcnRq4xvu1zO90rcJ3RlcLI2cT77fVe/KhCXDeanjwe 9570nq83zivE2a/suKw/6j90hM/eGWas1Dw+N63myi69AN6V9q2FZICazw/WcPfVAPGY Vl7/OjjjAdIEDJ9bBglJ857FpkpOZ3ES+ZhmQb3xnEmqCyDMMfWDPeX7q8ZyHhJCkTgY EQuc6tD2Qco9Q9tYlqxv0gnqZQLR5RqgOnt/HzDE2b9Hrz+QUfmI039x6g5AQ7BKMI9h GHn2TTXJ31eGH+Iin0TG/SBLs8OKCttD0OeS+1XFH5zAHSSFlc734BDb5LQnBkqGDpIE hU8g== MIME-Version: 1.0 X-Received: by 10.194.87.97 with SMTP id w1mr2272592wjz.42.1405116657184; Fri, 11 Jul 2014 15:10:57 -0700 (PDT) Received: by 10.194.204.228 with HTTP; Fri, 11 Jul 2014 15:10:57
Re: Using HQL is terribly slow: Potential Performance Issue
Hey Jerry, When you ran these queries using different methods, did you see any discrepancy in the returned results (i.e. the counts)? On Thu, Jul 10, 2014 at 5:55 PM, Michael Armbrust mich...@databricks.com wrote: Yeah, sorry. I think you are seeing some weirdness with partitioned tables that I have also seen elsewhere. I've created a JIRA and assigned someone at databricks to investigate. https://issues.apache.org/jira/browse/SPARK-2443 On Thu, Jul 10, 2014 at 5:33 PM, Jerry Lam chiling...@gmail.com wrote: Hi Michael, Yes the table is partitioned on 1 column. There are 11 columns in the table and they are all String type. I understand that SerDes contributes to some overheads but using pure Hive, we could run the query about 5 times faster than SparkSQL. Given that Hive also has the same SerDes overhead, then there must be something additional that SparkSQL adds to the overall overheads that Hive doesn't have. Best Regards, Jerry On Thu, Jul 10, 2014 at 7:11 PM, Michael Armbrust mich...@databricks.com wrote: On Thu, Jul 10, 2014 at 2:08 PM, Jerry Lam chiling...@gmail.com wrote: For the curious mind, the dataset is about 200-300GB and we are using 10 machines for this benchmark. Given the env is equal between the two experiments, why pure spark is faster than SparkSQL? There is going to be some overhead to parsing data using the Hive SerDes instead of the native Spark code, however, the slow down you are seeing here is much larger than I would expect. Can you tell me more about the table? What does the schema look like? Is it partitioned? By the way, I also try hql(select * from m).count. It is terribly slow too. FYI, this query is actually identical to the one where you write out COUNT(*).
Re: Streaming training@ Spark Summit 2014
You dont get any exception from twitter.com, saying credential error or something? I have seen this happen when once one was behind vpn to his office, and probably twitter was blocked in their office. You could be having a similar issue. TD On Fri, Jul 11, 2014 at 2:57 PM, SK skrishna...@gmail.com wrote: Hi, I tried out the streaming program on the Spark training web page. I created a Twitter app as per the instructions (pointing to http://www.twitter.com ). When I run the program, my credentials get printed out correctly but thereafter, my program just keeps waiting. It does not print out the hashtag count etc. My code appears below (essentially same as what is on the training web page). I would like to know why I am not able to get a continuous stream and the hashtag count. thanks // relevant code snippet TutorialHelper.configureTwitterCredentials(apiKey,apiSecret,accessToken,accessTokenSecret) val ssc = new StreamingContext(new SparkConf(), Seconds(1)) val tweets = TwitterUtils.createStream(ssc, None) val statuses = tweets.map(status = status.getText()) statuses.print() ssc.checkpoint(checkpointDir) val words = statuses.flatMap(status = status.split( )) val hashtags = words.filter(word = word.startsWith(#)) hashtags.print() val counts = hashtags.map(tag = (tag, 1)) .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60 * 5), Seconds(1)) counts.print() val sortedCounts = counts.map { case(tag, count) = (count, tag) } .transform(rdd = rdd.sortByKey(false)) sortedCounts.foreach(rdd = println(\nTop 10 hashtags:\n + rdd.take(10).mkString(\n))) ssc.start() ssc.awaitTermination() //end code snippet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
try JDBC server
Hi, all I would like to give a try on JDBC server (which is supposed to be released in 1.1) where can I find the document about that? Best, -- Nan Zhu
Re: Some question about SQL and streaming
Yes, even though we dont have immediate plans, I definitely would like to see it happen some time in not-so-distant future. TD On Thu, Jul 10, 2014 at 7:55 PM, Shao, Saisai saisai.s...@intel.com wrote: No specific plans to do so, since there has some functional loss like time based windowing function which is important for streaming sql. Also keep compatible with fast growing SparkSQL is quite hard. So no clear plans to submit to upstream. -Jerry *From:* Tobias Pfeiffer [mailto:t...@preferred.jp] *Sent:* Friday, July 11, 2014 10:47 AM *To:* user@spark.apache.org *Subject:* Re: Some question about SQL and streaming Hi, On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai saisai.s...@intel.com wrote: Actually we have a POC project which shows the power of combining Spark Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get SchemaDStream. You can take a look at it: https://github.com/thunderain-project/StreamSQL Wow, that looks great! Any plans to get this code (or functionality) merged into Spark? Tobias
Re: Generic Interface between RDD and DStream
I totally agree that doing if we are able to do this it will be very cool. However, this requires having a common trait / interface between RDD and DStream, which we dont have as of now. It would be very cool though. On my wish list for sure. TD On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote: I wanted to get a perspective on how to share code between Spark batch processing and Spark Streaming. For example, I want to get unique tweets stored in a HDFS file then in both Spark Batch and Spark Streaming. Currently I will have to do following thing: Tweet { String tweetText; String userId; } Spark Batch: tweets = sparkContext.newHadoopApiAsFile(tweet); def getUniqueTweets(tweets: RDD[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Spark Streaming: tweets = streamingContext.fileStream(tweet); def getUniqueTweets(tweets: DStream[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Above example shows I am doing the same thing but I have to replicate the code as there is no common abstraction between DStream and RDD, SparkContext and Streaming Context. If there was a common abstraction it would have been much simlper: tweets = context.read(tweet, Stream or Batch) def getUniqueTweets(tweets: SparkObject[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } I would appreciate thoughts on it. Is it already available? Is there any plan to add this support? Is it intentionally not supported because of design choice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: try JDBC server
nvm for others with the same question: https://github.com/apache/spark/commit/8032fe2fae3ac40a02c6018c52e76584a14b3438 -- Nan Zhu On Friday, July 11, 2014 at 7:02 PM, Nan Zhu wrote: Hi, all I would like to give a try on JDBC server (which is supposed to be released in 1.1) where can I find the document about that? Best, -- Nan Zhu
Re: Streaming training@ Spark Summit 2014
I dont get any exceptions or error messages. I tried it both with and without VPN and had the same outcome. But I can try again without VPN later today and report back. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Number of executors change during job running
Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set,
Re: Streaming training@ Spark Summit 2014
Do you have a proxy server ? If yes you need to set the proxy for twitter4j On Jul 11, 2014, at 7:06 PM, SK skrishna...@gmail.com wrote: I dont get any exceptions or error messages. I tried it both with and without VPN and had the same outcome. But I can try again without VPN later today and report back. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9477.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Generic Interface between RDD and DStream
A while ago, I wrote this: ``` package com.virdata.core.compute.common.api import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext sealed trait SparkEnvironment extends Serializable { type Context type Wagon[A] } object Batch extends SparkEnvironment { type Context = SparkContext type Wagon[A] = RDD[A] } object Streaming extends SparkEnvironment{ type Context = StreamingContext type Wagon[A] = DStream[A] } ``` Then I can produce code like this (just an example) ``` package com.virdata.core.compute.common.api import org.apache.spark.Logging trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends Logging { self = def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]] def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new Process[M,In,Q,U,E] { override def run(in: M[E#Wagon[In]])(implicit context: E#Context): Q[E#Wagon[U]] = { val run1: N[E#Wagon[Out]] = self.run(in) follow.run(run1) } } } ``` It's not resolving the whole thing, because we'll still have to duplicate both code (for Batch and Streaming). However, when the common traits will be there I'll have to remove half of the implementations only -- without touching the calling side (using them), and thus keeping my plain old backward compat' ^^. I know it's just an intermediate hack, but still ;-) greetz, aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I totally agree that doing if we are able to do this it will be very cool. However, this requires having a common trait / interface between RDD and DStream, which we dont have as of now. It would be very cool though. On my wish list for sure. TD On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote: I wanted to get a perspective on how to share code between Spark batch processing and Spark Streaming. For example, I want to get unique tweets stored in a HDFS file then in both Spark Batch and Spark Streaming. Currently I will have to do following thing: Tweet { String tweetText; String userId; } Spark Batch: tweets = sparkContext.newHadoopApiAsFile(tweet); def getUniqueTweets(tweets: RDD[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Spark Streaming: tweets = streamingContext.fileStream(tweet); def getUniqueTweets(tweets: DStream[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Above example shows I am doing the same thing but I have to replicate the code as there is no common abstraction between DStream and RDD, SparkContext and Streaming Context. If there was a common abstraction it would have been much simlper: tweets = context.read(tweet, Stream or Batch) def getUniqueTweets(tweets: SparkObject[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } I would appreciate thoughts on it. Is it already available? Is there any plan to add this support? Is it intentionally not supported because of design choice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming training@ Spark Summit 2014
I dont have a proxy server. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Linkage error - duplicate class definition
Facing a funny issue with the Spark class loader. Testing out a basic functionality on a vagrant VM with spark running - looks like it's attempting to ship the jar to a remote instance (in this case local) and somehow is encountering the jar twice? 14/07/11 23:27:59 INFO DAGScheduler: Got job 0 (count at GenerateSEOContent.java:75) with 1 output partitions (allowLocal=false) 14/07/11 23:27:59 INFO DAGScheduler: Final stage: Stage 0(count at GenerateSEOContent.java:75) 14/07/11 23:27:59 INFO DAGScheduler: Parents of final stage: List() 14/07/11 23:27:59 INFO DAGScheduler: Missing parents: List() 14/07/11 23:27:59 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at map at GenerateSEOContent.java:67), which has no missing parents 14/07/11 23:27:59 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[1] at map at GenerateSEOContent.java:67) 14/07/11 23:27:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/07/11 23:27:59 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 14/07/11 23:27:59 INFO TaskSetManager: Serialized task 0.0:0 as 3287 bytes in 4 ms 14/07/11 23:27:59 INFO Executor: Running task ID 0 14/07/11 23:27:59 INFO Executor: Fetching http://10.141.141.10:36365/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar with timestamp 1405121278732 14/07/11 23:27:59 INFO Utils: Fetching http://10.141.141.10:36365/jars/rickshaw-spark-0.0.1-SNAPSHOT.jar to /tmp/fetchFileTemp2298196547032055523.tmp 14/07/11 23:27:59 INFO Executor: Adding file:/tmp/spark-defa5d35-1853-492f-b8e0-e7ac30a370b1/rickshaw-spark-0.0.1-SNAPSHOT.jar to class loader 14/07/11 23:27:59 ERROR Executor: Exception in task ID 0 java.lang.LinkageError: loader (instance of org/apache/spark/executor/ChildExecutorURLClassLoader$userClassLoader$): attempted duplicate class definition for name: com/evocalize/rickshaw/spark/util/HdfsUtil at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at org.apache.spark.executor.ChildExecutorURLClassLoader$userClassLoader$.findClass(ExecutorURLClassLoader.scala:42) at org.apache.spark.executor.ChildExecutorURLClassLoader.findClass(ExecutorURLClassLoader.scala:50) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Linkage-error-duplicate-class-definition-tp9482.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Graphx : optimal partitions for a graph and error in logs
Great! Thanks a lot. Hate to say this but I promise this is last quickie I looked at the configurations but I didn't find any parameter to tune for network bandwidth i.e. Is there anyway to tell graphx (spark) that I'm using 1G network or 10G network or infinite band? Does it figure out on its own and speed up message passing accordingly? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9483.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark ui on yarn
I just tested a long lived application (that we normally run in standalone mode) on yarn in client mode. it looks to me like cached rdds are missing in the storage tap of the ui. accessing the rdd storage information via the spark context shows rdds as fully cached but they are missing on storage page. spark 1.0.0
ML classifier and data format for dataset with variable number of features
Hi, I need to perform binary classification on an image dataset. Each image is a data point described by a Json object. The feature set for each image is a set of feature vectors, each feature vector corresponding to a distinct object in the image. For example, if an image has 5 objects, its feature set will have 5 feature vectors, whereas an image that has 3 objects will have a feature set consisting of 3 feature vectors. So the number of feature vectors may be different for different images, although each feature vector has the same number of attributes. The classification depends on the features of the individual objects, so I cannot aggregate them all into a flat vector. I have looked through the Mllib examples and it appears that the libSVM data format and the LabeledData format that Mllib uses, require all the points to have the same number of features and they read in a flat feature vector. I would like to know if any of the Mllib supervised learning classifiers can be used with json data format and whether they can be used to classify points with different number of features as described above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Graphx : optimal partitions for a graph and error in logs
Spark just uses opens up inter-slave TCP connections for message passing during shuffles (I think the relevant code is in ConnectionManager). Since TCP automatically determines http://en.wikipedia.org/wiki/TCP_congestion-avoidance_algorithm the optimal sending rate, Spark doesn't need any configuration parameters for this. Ankur http://www.ankurdave.com/
Re: Graphx : optimal partitions for a graph and error in logs
Perfect! Thanks Ankur. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Graphx-optimal-partitions-for-a-graph-and-error-in-logs-tp9455p9488.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ML classifier and data format for dataset with variable number of features
You can load the dataset as an RDD of JSON object and use a flatMap to extract feature vectors at object level. Then you can filter the training examples you want for binary classification. If you want to try multiclass, checkout DB's PR at https://github.com/apache/spark/pull/1379 Best, Xiangrui On Fri, Jul 11, 2014 at 5:12 PM, SK skrishna...@gmail.com wrote: Hi, I need to perform binary classification on an image dataset. Each image is a data point described by a Json object. The feature set for each image is a set of feature vectors, each feature vector corresponding to a distinct object in the image. For example, if an image has 5 objects, its feature set will have 5 feature vectors, whereas an image that has 3 objects will have a feature set consisting of 3 feature vectors. So the number of feature vectors may be different for different images, although each feature vector has the same number of attributes. The classification depends on the features of the individual objects, so I cannot aggregate them all into a flat vector. I have looked through the Mllib examples and it appears that the libSVM data format and the LabeledData format that Mllib uses, require all the points to have the same number of features and they read in a flat feature vector. I would like to know if any of the Mllib supervised learning classifiers can be used with json data format and whether they can be used to classify points with different number of features as described above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Announcing Spark 1.0.1
I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
Re: Streaming training@ Spark Summit 2014
Try running a simple standalone program if you are using Scala and see if you are getting any data. I use this to debug any connection/twitter4j issues. import twitter4j._ //put your keys and creds here object Util { val config = new twitter4j.conf.ConfigurationBuilder() .setOAuthConsumerKey() .setOAuthConsumerSecret() .setOAuthAccessToken() .setOAuthAccessTokenSecret() .build } /** * Add this to your build.sbt * org.twitter4j % twitter4j-stream % 3.0.3, */ object SimpleStreamer extends App { def simpleStatusListener = new StatusListener() { def onStatus(status: Status) { println(status.getUserMentionEntities.length) } def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {} def onException(ex: Exception) { ex.printStackTrace } def onScrubGeo(arg0: Long, arg1: Long) {} def onStallWarning(warning: StallWarning) {} } val keywords = List(dog, cat) val twitterStream = new TwitterStreamFactory(Util.config).getInstance twitterStream.addListener(simpleStatusListener) twitterStream.filter(new FilterQuery().track(keywords.toArray)) } On Fri, Jul 11, 2014 at 7:19 PM, SK skrishna...@gmail.com wrote: I dont have a proxy server. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Streaming training@ Spark Summit 2014
Does nothing get printed on the screen? If you are not getting any tweets but spark streaming is running successfully you should get at least counts being printed every batch (which would be zero). But they are not being printed either, check the spark web ui to see stages are running or not. If they are not, you may not have enough cores to run. TD On Fri, Jul 11, 2014 at 7:09 PM, Soumya Simanta soumya.sima...@gmail.com wrote: Try running a simple standalone program if you are using Scala and see if you are getting any data. I use this to debug any connection/twitter4j issues. import twitter4j._ //put your keys and creds here object Util { val config = new twitter4j.conf.ConfigurationBuilder() .setOAuthConsumerKey() .setOAuthConsumerSecret() .setOAuthAccessToken() .setOAuthAccessTokenSecret() .build } /** * Add this to your build.sbt * org.twitter4j % twitter4j-stream % 3.0.3, */ object SimpleStreamer extends App { def simpleStatusListener = new StatusListener() { def onStatus(status: Status) { println(status.getUserMentionEntities.length) } def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} def onTrackLimitationNotice(numberOfLimitedStatuses: Int) {} def onException(ex: Exception) { ex.printStackTrace } def onScrubGeo(arg0: Long, arg1: Long) {} def onStallWarning(warning: StallWarning) {} } val keywords = List(dog, cat) val twitterStream = new TwitterStreamFactory(Util.config).getInstance twitterStream.addListener(simpleStatusListener) twitterStream.filter(new FilterQuery().track(keywords.toArray)) } On Fri, Jul 11, 2014 at 7:19 PM, SK skrishna...@gmail.com wrote: I dont have a proxy server. thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-training-Spark-Summit-2014-tp9465p9481.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How are the executors used in Spark Streaming in terms of receiver and driver program?
Task slot is equivalent to core number. So one core can only run one task at a time. TD On Fri, Jul 11, 2014 at 1:57 PM, Yan Fang yanfang...@gmail.com wrote: Hi Tathagata, Thank you. Is task slot equivalent to the core number? Or actually one core can run multiple tasks at the same time? Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 1:45 PM, Tathagata Das tathagata.das1...@gmail.com wrote: The same executor can be used for both receiving and processing, irrespective of the deployment mode (yarn, spark standalone, etc.) It boils down to the number of cores / task slots that executor has. Each receiver is like a long running task, so each of them occupy a slot. If there are free slots in the executor then other tasks can be run on them. So if you are finding that the other tasks are being run, check how many cores/task slots the executor has and whether there are more task slots than the number of input dstream / receivers you are launching. @Praveen your answers were pretty much spot on, thanks for chipping in! On Fri, Jul 11, 2014 at 11:16 AM, Yan Fang yanfang...@gmail.com wrote: Hi Praveen, Thank you for the answer. That's interesting because if I only bring up one executor for the Spark Streaming, it seems only the receiver is working, no other tasks are happening, by checking the log and UI. Maybe it's just because the receiving task eats all the resource?, not because one executor can only run one receiver? Fang, Yan yanfang...@gmail.com +1 (206) 849-4108 On Fri, Jul 11, 2014 at 6:06 AM, Praveen Seluka psel...@qubole.com wrote: Here are my answers. But am just getting started with Spark Streaming - so please correct me if am wrong. 1) Yes 2) Receivers will run on executors. Its actually a job thats submitted where # of tasks equals # of receivers. An executor can actually run more than one task at the same time. Hence you could have more number of receivers than executors but its not recommended I think. 3) As said in 2, the executor where receiver task is running can be used for map/reduce tasks. In yarn-cluster mode, the driver program is actually run as application master (lives in the first container thats launched) and this is not an executor - hence its not used for other operations. 4) the driver runs in a separate container. I think the same executor can be used for receiver and the processing task also (this part am not very sure) On Fri, Jul 11, 2014 at 12:29 AM, Yan Fang yanfang...@gmail.com wrote: Hi all, I am working to improve the parallelism of the Spark Streaming application. But I have problem in understanding how the executors are used and the application is distributed. 1. In YARN, is one executor equal one container? 2. I saw the statement that a streaming receiver runs on one work machine (*n**ote that each input DStream creates a single receiver (running on a worker machine) that receives a single stream of data*). Does the work machine mean the executor or physical machine? If I have more receivers than the executors, will it still work? 3. Is the executor that holds receiver also used for other operations, such as map, reduce, or fully occupied by the receiver? Similarly, if I run in yarn-cluster mode, is the executor running driver program used by other operations too? 4. So if I have a driver program (cluster mode) and streaming receiver, do I have to have at least 2 executors because the program and streaming receiver have to be on different executors? Thank you. Sorry for having so many questions but I do want to understand how the Spark Streaming distributes in order to assign reasonable recourse.*_* Thank you again. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Number of executors change during job running
Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which is the number of the executors I specified in submission script. Moreover, the running time of different executors is skewed. The ideal case is that Spark can distribute the data into 300 executors evenly so that the computation can be efficiently finished. I am not sure how to achieve this. Thanks! Bill On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD
Re: pyspark sc.parallelize running OOM with smallish data
I put the same dataset into scala (using spark-shell) and it acts weird. I cannot do a count on it, the executors seem to hang. The WebUI shows 0/96 in the status bar, shows details about the worker nodes but there is no progress. sc.parallelize does finish (takes too long for the data size) in scala. On Fri, Jul 11, 2014 at 2:00 PM, Mohit Jaggi mohitja...@gmail.com wrote: spark_data_array here has about 35k rows with 4k columns. I have 4 nodes in the cluster and gave 48g to executors. also tried kyro serialization. traceback (most recent call last): File /mohit/./m.py, line 58, in module spark_data = sc.parallelize(spark_data_array) File /mohit/spark/python/pyspark/context.py, line 265, in parallelize jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /mohit/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: Spark Streaming timing considerations
This is not in the current streaming API. Queue stream is useful for testing with generated RDDs, but not for actual data. For actual data stream, the slack time can be implemented by doing DStream.window on a larger window that take slack time in consideration, and then the required application-time-based-window of data filtered out. For example, if you want a slack time of 1 minute and batches of 10 seconds, then do a window operation of 70 seconds, then in each RDD filter out the records with the desired application time and process them. TD On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, In the spark streaming paper, slack time has been suggested for delaying the batch creation in case of external timestamps. I don't see any such option in streamingcontext. Is it available in the API? Also going through the previous posts, queueStream has been suggested for this. I looked into to queueStream example. // Create and push some RDDs into Queue for (i - 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 10) Thread.sleep(1000) } The only thing I am unsure is how to make batches(basic RDD) out of stream coming on a port. Regards, Laeeq
Re: Generic Interface between RDD and DStream
Hey Andy, Thats pretty cool!! Is there a github repo where you can share this piece of code for us to play around? If we can come up with a simple enough general pattern, that can be very usefull! TD On Fri, Jul 11, 2014 at 4:12 PM, andy petrella andy.petre...@gmail.com wrote: A while ago, I wrote this: ``` package com.virdata.core.compute.common.api import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.StreamingContext sealed trait SparkEnvironment extends Serializable { type Context type Wagon[A] } object Batch extends SparkEnvironment { type Context = SparkContext type Wagon[A] = RDD[A] } object Streaming extends SparkEnvironment{ type Context = StreamingContext type Wagon[A] = DStream[A] } ``` Then I can produce code like this (just an example) ``` package com.virdata.core.compute.common.api import org.apache.spark.Logging trait Process[M[_], In, N[_], Out, E : SparkEnvironment] extends Logging { self = def run(in:M[E#Wagon[In]])(implicit context:E#Context):N[E#Wagon[Out]] def pipe[Q[_],U](follow:Process[N,Out,Q,U,E]):Process[M,In,Q,U,E] = new Process[M,In,Q,U,E] { override def run(in: M[E#Wagon[In]])(implicit context: E#Context): Q[E#Wagon[U]] = { val run1: N[E#Wagon[Out]] = self.run(in) follow.run(run1) } } } ``` It's not resolving the whole thing, because we'll still have to duplicate both code (for Batch and Streaming). However, when the common traits will be there I'll have to remove half of the implementations only -- without touching the calling side (using them), and thus keeping my plain old backward compat' ^^. I know it's just an intermediate hack, but still ;-) greetz, aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Sat, Jul 12, 2014 at 12:57 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I totally agree that doing if we are able to do this it will be very cool. However, this requires having a common trait / interface between RDD and DStream, which we dont have as of now. It would be very cool though. On my wish list for sure. TD On Thu, Jul 10, 2014 at 11:53 AM, mshah shahmaul...@gmail.com wrote: I wanted to get a perspective on how to share code between Spark batch processing and Spark Streaming. For example, I want to get unique tweets stored in a HDFS file then in both Spark Batch and Spark Streaming. Currently I will have to do following thing: Tweet { String tweetText; String userId; } Spark Batch: tweets = sparkContext.newHadoopApiAsFile(tweet); def getUniqueTweets(tweets: RDD[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Spark Streaming: tweets = streamingContext.fileStream(tweet); def getUniqueTweets(tweets: DStream[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } Above example shows I am doing the same thing but I have to replicate the code as there is no common abstraction between DStream and RDD, SparkContext and Streaming Context. If there was a common abstraction it would have been much simlper: tweets = context.read(tweet, Stream or Batch) def getUniqueTweets(tweets: SparkObject[Tweet])= { tweets.map(tweet=(tweetText, tweet).groupByKey(tweetText).map((tweetText, _) =tweetText) } I would appreciate thoughts on it. Is it already available? Is there any plan to add this support? Is it intentionally not supported because of design choice? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Generic-Interface-between-RDD-and-DStream-tp9331.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Number of executors change during job running
Hi Tathagata, Do you mean that the data is not shuffled until the reduce stage? That means groupBy still only uses 2 machines? I think I used repartition(300) after I read the data from Kafka into DStream. It seems that it did not guarantee that the map or reduce stages will be run on 300 machines. I am currently trying to initiate 100 DStream from KafkaUtils.createDStream and union them. Now the reduce stages had around 80 machines for all the batches. However, this method will introduce many dstreams. It will be good if we can control the number of executors in the groupBy operation because the calculation needs to be finished within 1 minute for different size of input data based on our production need. Thanks! Bill On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Aah, I get it now. That is because the input data streams is replicated on two machines, so by locality the data is processed on those two machines. So the map stage on the data uses 2 executors, but the reduce stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the default parallelism takes into affect only when the data is explicitly shuffled around. You can fix this by explicitly repartitioning the data. inputDStream.repartition(partitions) This is covered in the streaming tuning guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving . TD On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi folks, I just ran another job that only received data from Kafka, did some filtering, and then save as text files in HDFS. There was no reducing work involved. Surprisingly, the number of executors for the saveAsTextFiles stage was also 2 although I specified 300 executors in the job submission. As a result, the simple save file action took more than 2 minutes. Do you have any idea how Spark determined the number of executors for different stages? Thanks! Bill On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, Below is my main function. I omit some filtering and data conversion functions. These functions are just a one-to-one mapping, which may not possible increase running time. The only reduce function I have here is groupByKey. There are 4 topics in my Kafka brokers and two of the topics have 240k lines each minute. And the other two topics have less than 30k lines per minute. The batch size is one minute and I specified 300 executors in my spark-submit script. The default parallelism is 300. val parition = 300 val zkQuorum = zk1,zk2,zk3 val group = my-group- + currentTime.toString val topics = topic1,topic2,topic3,topic4 val numThreads = 4 val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap ssc = new StreamingContext(conf, Seconds(batch)) ssc.checkpoint(hadoopOutput + checkpoint) val lines = lines1 lines.cache() val jsonData = lines.map(JSON.parseFull(_)) val mapData = jsonData.filter(_.isDefined) .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) val validMapData = mapData.filter(isValidData(_)) val fields = validMapData.map(data = (data(id).toString, timestampToUTCUnix(data(time).toString), timestampToUTCUnix(data(local_time).toString), data(id2).toString, data(id3).toString, data(log_type).toString, data(sub_log_type).toString)) val timeDiff = 3600L val filteredFields = fields.filter(field = abs(field._2 - field._3) = timeDiff) val watchTimeFields = filteredFields.map(fields = (fields._1, fields._2, fields._4, fields._5, fields._7)) val watchTimeTuples = watchTimeFields.map(fields = getWatchtimeTuple(fields)) val programDuids = watchTimeTuples.map(fields = (fields._3, fields._1)).groupByKey(partition) val programDuidNum = programDuids.map{case(key, value) = (key, value.toSet.size)} programDuidNum.saveAsTextFiles(hadoopOutput+result) I have been working on this for several days. No findings why there are always 2 executors for the groupBy stage. Thanks a lot! Bill On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you show us the program that you are running. If you are setting number of partitions in the XYZ-ByKey operation as 300, then there should be 300 tasks for that stage, distributed on the 50 executors are allocated to your context. However the data distribution may be skewed in which case, you can use a repartition operation to redistributed the data more evenly (both DStream and RDD have repartition). TD On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I also tried to use the number of partitions as parameters to the functions such as groupByKey. It seems the numbers of executors is around 50 instead of 300, which
Re: Announcing Spark 1.0.1
Congrats to the Spark community ! On Friday, July 11, 2014, Patrick Wendell pwend...@gmail.com wrote: I am happy to announce the availability of Spark 1.0.1! This release includes contributions from 70 developers. Spark 1.0.0 includes fixes across several areas of Spark, including the core API, PySpark, and MLlib. It also includes new features in Spark's (alpha) SQL library, including support for JSON data and performance and stability fixes. Visit the release notes[1] to read about this release or download[2] the release today. [1] http://spark.apache.org/releases/spark-release-1-0-1.html [2] http://spark.apache.org/downloads.html
RE: EC2 Cluster script. Shark install fails
Thanks Michael Missed that point as well as the integration of SQL within the scala shell (with setting the SQLContext)Looking forward to feature parity with feature releases. (Shark - Spark SQL) Cheers. From: mich...@databricks.com Date: Thu, 10 Jul 2014 16:20:20 -0700 Subject: Re: EC2 Cluster script. Shark install fails To: user@spark.apache.org There is no version of Shark that is compatible with Spark 1.0, however, Spark SQL does come included automatically. More information here: http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html http://spark.apache.org/docs/latest/sql-programming-guide.html On Thu, Jul 10, 2014 at 5:51 AM, Jason H jas...@developer.net.nz wrote: Hi Just going though the process of installing Spark 1.0.0 on EC2 and notice that the script throws an error when installing shark. Unpacking Spark ~/spark-ec2 Initializing shark ~ ~/spark-ec2 ERROR: Unknown Shark version The install completes in the end but shark is completely missed. Looking for info on the best way to manually add this in now that the cluster is setup. Is there no Shark version compat with 1.0.0 or this script? Any suggestions appreciated.
Re: One question about RDD.zip function when trying Naive Bayes
I tried my test case with Spark 1.0.1 and saw the same result(27 pairs becomes 25 pairs after zip). Could someone please check it? Regards, xj On Thu, Jul 3, 2014 at 2:31 PM, Xiangrui Meng men...@gmail.com wrote: This is due to a bug in sampling, which was fixed in 1.0.1 and latest master. See https://github.com/apache/spark/pull/1234 . -Xiangrui On Wed, Jul 2, 2014 at 8:23 PM, x wasedax...@gmail.com wrote: Hello, I a newbie to Spark MLlib and ran into a curious case when following the instruction at the page below. http://spark.apache.org/docs/latest/mllib-naive-bayes.html I ran a test program on my local machine using some data. val spConfig = (new SparkConf).setMaster(local).setAppName(SparkNaiveBayes) val sc = new SparkContext(spConfig) The test data was as follows and there were three lableled categories I wanted to predict. 1 LabeledPoint(0.0, [4.9,3.0,1.4,0.2]) 2 LabeledPoint(0.0, [4.6,3.4,1.4,0.3]) 3 LabeledPoint(0.0, [5.7,4.4,1.5,0.4]) 4 LabeledPoint(0.0, [5.2,3.4,1.4,0.2]) 5 LabeledPoint(0.0, [4.7,3.2,1.6,0.2]) 6 LabeledPoint(0.0, [4.8,3.1,1.6,0.2]) 7 LabeledPoint(0.0, [5.1,3.8,1.9,0.4]) 8 LabeledPoint(0.0, [4.8,3.0,1.4,0.3]) 9 LabeledPoint(0.0, [5.0,3.3,1.4,0.2]) 10 LabeledPoint(1.0, [6.6,2.9,4.6,1.3]) 11 LabeledPoint(1.0, [5.2,2.7,3.9,1.4]) 12 LabeledPoint(1.0, [5.6,2.5,3.9,1.1]) 13 LabeledPoint(1.0, [6.4,2.9,4.3,1.3]) 14 LabeledPoint(1.0, [6.6,3.0,4.4,1.4]) 15 LabeledPoint(1.0, [6.0,2.7,5.1,1.6]) 16 LabeledPoint(1.0, [5.5,2.6,4.4,1.2]) 17 LabeledPoint(1.0, [5.8,2.6,4.0,1.2]) 18 LabeledPoint(1.0, [5.7,2.9,4.2,1.3]) 19 LabeledPoint(1.0, [5.7,2.8,4.1,1.3]) 20 LabeledPoint(2.0, [6.3,2.9,5.6,1.8]) 21 LabeledPoint(2.0, [6.5,3.0,5.8,2.2]) 22 LabeledPoint(2.0, [6.5,3.0,5.5,1.8]) 23 LabeledPoint(2.0, [6.7,3.3,5.7,2.1]) 24 LabeledPoint(2.0, [7.4,2.8,6.1,1.9]) 25 LabeledPoint(2.0, [6.3,3.4,5.6,2.4]) 26 LabeledPoint(2.0, [6.0,3.0,4.8,1.8]) 27 LabeledPoint(2.0, [6.8,3.2,5.9,2.3]) The predicted result via NaiveBayes is below. Comparing to test data, only two predicted results(#11 and #15) were different. 1 0.0 2 0.0 3 0.0 4 0.0 5 0.0 6 0.0 7 0.0 8 0.0 9 0.0 10 1.0 11 2.0 12 1.0 13 1.0 14 1.0 15 2.0 16 1.0 17 1.0 18 1.0 19 1.0 20 2.0 21 2.0 22 2.0 23 2.0 24 2.0 25 2.0 26 2.0 27 2.0 After grouping test RDD and predicted RDD via zip I got this. 1 (0.0,0.0) 2 (0.0,0.0) 3 (0.0,0.0) 4 (0.0,0.0) 5 (0.0,0.0) 6 (0.0,0.0) 7 (0.0,0.0) 8 (0.0,0.0) 9 (0.0,1.0) 10 (0.0,1.0) 11 (0.0,1.0) 12 (1.0,1.0) 13 (1.0,1.0) 14 (2.0,1.0) 15 (1.0,1.0) 16 (1.0,2.0) 17 (1.0,2.0) 18 (1.0,2.0) 19 (1.0,2.0) 20 (2.0,2.0) 21 (2.0,2.0) 22 (2.0,2.0) 23 (2.0,2.0) 24 (2.0,2.0) 25 (2.0,2.0) I expected there were 27 pairs but I saw two results were lost. Could someone please point out what I missed something here? Regards, xj