Null array of cols
I am trying to understand the best way to handle the scenario where null array "[]" is passed. Can somebody suggest if there is a way to filter out such records. I've tried numerous things including using dataframe.head().isEmpty but pyspark doesn't recognize isEmpty even though I see it in the API docs. pyspark.sql.utils.AnalysisException: u"cannot resolve '`timestamp`' given input columns: []; line 1 pos 0;\n'Filter isnotnull('timestamp)\n+- LogicalRDD\n"
Spark directory partition name
When spark writes the partition it writes in the format as: =/key value> Is there a way to have spark write only by keyvalue?
Re: Write Ahead Log
Is there any specific reason why this feature is only supported in streaming? On Wed, Jun 8, 2016 at 3:24 PM, Ted Yu <yuzhih...@gmail.com> wrote: > There was a minor typo in the name of the config: > > spark.streaming.receiver.writeAheadLog.enable > > Yes, it only applies to Streaming. > > On Wed, Jun 8, 2016 at 3:14 PM, Mohit Anchlia <mohitanch...@gmail.com> > wrote: > >> Is something similar to park.streaming.receiver.writeAheadLog.enable >> available on SparkContext? It looks like it only works for spark streaming. >> > >
Write Ahead Log
Is something similar to park.streaming.receiver.writeAheadLog.enable available on SparkContext? It looks like it only works for spark streaming.
Re: Dealing with failures
On Wed, Jun 8, 2016 at 3:42 AM, Jacek Laskowski <ja...@japila.pl> wrote: > On Wed, Jun 8, 2016 at 2:38 AM, Mohit Anchlia <mohitanch...@gmail.com> > wrote: > > I am looking to write an ETL job using spark that reads data from the > > source, perform transformation and insert it into the destination. > > Is this going to be one-time job or you want it to run every time interval? > > > 1. Source becomes slow or un-responsive. How to control such a situation > so > > that it doesn't cause DDoS on the source? > > Why do you think Spark would DDoS the source? I'm reading it as if > Spark tried to open a new connection after the currently-open one > became slow. I don't think it's how Spark does connections. What is > the source in your use case? > >> I was primarily concerned about retires storms causing DDoS on the source. How does spark deal with a scenario where it gets timeout from the source. Does it retry or does it fail? And if the task fail does it fail the job. And is it possible to restart the job and only process the failed tasks and the remaining pending tasks? My use case is reading from Cassandra performing some transformation and saving the data to a different Cassandra cluster. I want to make sure that the data is reliably copied without missing data. At the same time also want to make sure that the process doesn't cause performance impact to other live production traffic to these clusters when there are failures eg: DDoS or retry storms. > > Also, at the same time how to make it resilient that it does pick up > from where it left? > > It sounds like checkpointing. It's available in Core and Streaming. > So, what's your source and how often do you want to query for data? > You may also benefit from the recent additions to Spark in 2.0 called > Structured Streaming (aka Streaming Datasets) - see > https://issues.apache.org/jira/browse/SPARK-8360. > > >> Does checkpointing help with the failure scenario that I described above? I read checkpointing as a way to restart processing of data if tasks fail because of spark cluster issues. Does it also work in the scenario that I described? > > 2. In the same context when destination becomes slow or un-responsive. > > What is a destination? It appears as if you were doing streaming and > want to use checkpointing and back-pressure. But you haven't said much > about your use case to be precise. > > Jacek >
Dealing with failures
I am looking to write an ETL job using spark that reads data from the source, perform transformation and insert it into the destination. I am trying to understand how spark deals with failures? I can't seem to find the documentation. I am interested in learning the following scenarios: 1. Source becomes slow or un-responsive. How to control such a situation so that it doesn't cause DDoS on the source? Also, at the same time how to make it resilient that it does pick up from where it left? 2. In the same context when destination becomes slow or un-responsive.
Re: Too many files/dirs in hdfs
Based on what I've read it appears that when using spark streaming there is no good way of optimizing the files on HDFS. Spark streaming writes many small files which is not scalable in apache hadoop. Only other way seem to be to read files after it has been written and merge them to a bigger file, which seems like a extra overhead from maintenance and IO perspective. On Mon, Aug 24, 2015 at 2:51 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Any help would be appreciated On Wed, Aug 19, 2015 at 9:38 AM, Mohit Anchlia mohitanch...@gmail.com wrote: My question was how to do this in Hadoop? Could somebody point me to some examples? On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: Too many files/dirs in hdfs
Any help would be appreciated On Wed, Aug 19, 2015 at 9:38 AM, Mohit Anchlia mohitanch...@gmail.com wrote: My question was how to do this in Hadoop? Could somebody point me to some examples? On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: Too many files/dirs in hdfs
My question was how to do this in Hadoop? Could somebody point me to some examples? On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com wrote: Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: Too many files/dirs in hdfs
Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Too many files/dirs in hdfs
Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: Spark RuntimeException hadoop output format
Spark 1.3 Code: wordCounts.foreachRDD(*new* *Function2JavaPairRDDString, Integer, Time, Void()* { @Override *public* Void call(JavaPairRDDString, Integer rdd, Time time) *throws* IOException { String counts = Counts at time + time + + rdd.collect(); System.*out*.println(counts); System.*out*.println(Appending to + outputFile.getAbsolutePath()); Files.*append*(counts + \n, outputFile, Charset.*defaultCharset*()); *return* *null*; } }); wordCounts.saveAsHadoopFiles(outputPath, txt, Text.*class*, Text.*class*, TextOutputFormat.*class*); What do I need to check in namenode? I see 0 bytes files like this: drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495124000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495125000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495126000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495127000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495128000.txt However, I also wrote data to a local file on the local file system for verification and I see the data: $ ls -ltr !$ ls -ltr /tmp/out -rw-r--r-- 1 yarn yarn 5230 Aug 13 15:45 /tmp/out On Fri, Aug 14, 2015 at 6:15 AM, Ted Yu yuzhih...@gmail.com wrote: Which Spark release are you using ? Can you show us snippet of your code ? Have you checked namenode log ? Thanks On Aug 13, 2015, at 10:21 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I was able to get this working by using an alternative method however I only see 0 bytes files in hadoop. I've verified that the output does exist in the logs however it's missing from hdfs. On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I have this call trying to save to hdfs 2.6 wordCounts.saveAsNewAPIHadoopFiles(prefix, txt); but I am getting the following: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapreduce.OutputFormat
Re: Spark RuntimeException hadoop output format
I thought prefix meant the output path? What's the purpose of prefix and where do I specify the path if not in prefix? On Fri, Aug 14, 2015 at 4:36 PM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at JavaPairDStream.scala: def saveAsHadoopFiles[F : OutputFormat[_, _]]( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F]) { Did you intend to use outputPath as prefix ? Cheers On Fri, Aug 14, 2015 at 1:36 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark 1.3 Code: wordCounts.foreachRDD(*new* *Function2JavaPairRDDString, Integer, Time, Void()* { @Override *public* Void call(JavaPairRDDString, Integer rdd, Time time) *throws* IOException { String counts = Counts at time + time + + rdd.collect(); System.*out*.println(counts); System.*out*.println(Appending to + outputFile.getAbsolutePath()); Files.*append*(counts + \n, outputFile, Charset.*defaultCharset*()); *return* *null*; } }); wordCounts.saveAsHadoopFiles(outputPath, txt, Text.*class*, Text. *class*, TextOutputFormat.*class*); What do I need to check in namenode? I see 0 bytes files like this: drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495124000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495125000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495126000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495127000.txt drwxr-xr-x - ec2-user supergroup 0 2015-08-13 15:45 /tmp/out-1439495128000.txt However, I also wrote data to a local file on the local file system for verification and I see the data: $ ls -ltr !$ ls -ltr /tmp/out -rw-r--r-- 1 yarn yarn 5230 Aug 13 15:45 /tmp/out On Fri, Aug 14, 2015 at 6:15 AM, Ted Yu yuzhih...@gmail.com wrote: Which Spark release are you using ? Can you show us snippet of your code ? Have you checked namenode log ? Thanks On Aug 13, 2015, at 10:21 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I was able to get this working by using an alternative method however I only see 0 bytes files in hadoop. I've verified that the output does exist in the logs however it's missing from hdfs. On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I have this call trying to save to hdfs 2.6 wordCounts.saveAsNewAPIHadoopFiles(prefix, txt); but I am getting the following: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapreduce.OutputFormat
Executors on multiple nodes
I am running on Yarn and do have a question on how spark runs executors on different data nodes. Is that primarily decided based on number of receivers? What do I need to do to ensure that multiple nodes are being used for data processing?
Spark RuntimeException hadoop output format
I have this call trying to save to hdfs 2.6 wordCounts.saveAsNewAPIHadoopFiles(prefix, txt); but I am getting the following: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapreduce.OutputFormat
Re: Spark RuntimeException hadoop output format
I was able to get this working by using an alternative method however I only see 0 bytes files in hadoop. I've verified that the output does exist in the logs however it's missing from hdfs. On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I have this call trying to save to hdfs 2.6 wordCounts.saveAsNewAPIHadoopFiles(prefix, txt); but I am getting the following: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapreduce.OutputFormat
Unit Testing
Is there a way to run spark streaming methods in standalone eclipse environment to test out the functionality?
Re: Partitioning in spark streaming
I am also trying to understand how are files named when writing to hadoop? for eg: how does saveAs method ensures that each executor is generating unique files? On Tue, Aug 11, 2015 at 4:21 PM, ayan guha guha.a...@gmail.com wrote: partitioning - by itself - is a property of RDD. so essentially it is no different in case of streaming where each batch is one RDD. You can use partitionBy on RDD and pass on your custom partitioner function to it. One thing you should consider is how balanced are your partitions ie your partition scheme should not skew data into one partition too much. Best Ayan On Wed, Aug 12, 2015 at 9:06 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc. -- Best Regards, Ayan Guha
Re: ClassNotFound spark streaming
After changing the '--deploy_mode client' the program seems to work however it looks like there is a bug in spark when using --deploy_mode as 'yarn'. Should I open a bug? On Tue, Aug 11, 2015 at 3:02 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I see the following line in the log 15/08/11 17:59:12 ERROR spark.SparkContext: Jar not found at file:/home/ec2-user/./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar, however I do see that this file exists on all the node in that path. Not sure what's happening here. Please note I am using it in yarn On Tue, Aug 11, 2015 at 1:52 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing following error. I think it's not able to find some other associated classes as I see $2 in the exception, but not sure what I am missing. 15/08/11 16:00:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, ip-10-241-251-141.us-west-2.compute.internal): java.lang.ClassNotFoundException: org.sony.spark.stream.test.JavaRecoverableNetworkWordCount$2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
Partitioning in spark streaming
How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc.
Re: ClassNotFound spark streaming
I see the following line in the log 15/08/11 17:59:12 ERROR spark.SparkContext: Jar not found at file:/home/ec2-user/./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar, however I do see that this file exists on all the node in that path. Not sure what's happening here. Please note I am using it in yarn On Tue, Aug 11, 2015 at 1:52 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing following error. I think it's not able to find some other associated classes as I see $2 in the exception, but not sure what I am missing. 15/08/11 16:00:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, ip-10-241-251-141.us-west-2.compute.internal): java.lang.ClassNotFoundException: org.sony.spark.stream.test.JavaRecoverableNetworkWordCount$2 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
Re: Partitioning in spark streaming
Thanks for the info. When data is written in hdfs how does spark keeps the filenames written by multiple executors unique On Tue, Aug 11, 2015 at 9:35 PM, Hemant Bhanawat hemant9...@gmail.com wrote: Posting a comment from my previous mail post: When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Now if you want to repartition based on a key, a shuffle is needed. On Wed, Aug 12, 2015 at 4:36 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How does partitioning in spark work when it comes to streaming? What's the best way to partition a time series data grouped by a certain tag like categories of product video, music etc.
Re: Streaming of WordCount example
I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: Streaming of WordCount example
I am using the same exact code: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Submitting like this: yarn: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master yarn-client --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out local: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master spark://localhost:9966 --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out Even though I am running as local I see it being scheduled and managed by yarn. On Mon, Aug 10, 2015 at 12:56 PM, Tathagata Das t...@databricks.com wrote: Is it receiving any data? If so, then it must be listening. Alternatively, to test these theories, you can locally running a spark standalone cluster (one node standalone cluster in local machine), and submit your app in client mode on that to see whether you are seeing the process listening on or not. On Mon, Aug 10, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I've verified all the executors and I don't see a process listening on the port. However, the application seem to show as running in the yarn UI On Mon, Aug 10, 2015 at 11:56 AM, Tathagata Das t...@databricks.com wrote: In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Re: Streaming of WordCount example
I do see this message: 15/08/10 19:19:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources On Mon, Aug 10, 2015 at 4:15 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am using the same exact code: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Submitting like this: yarn: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master yarn-client --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out local: /opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/bin/spark-submit --class org.sony.spark.stream.test.JavaRecoverableNetworkWordCount --master spark://localhost:9966 --total-executor-cores 3 ./spark-streaming-test-0.0.1-SNAPSHOT-jar-with-dependencies.jar localhost /user/ec2-user/checkpoint/ /user/ec2-user/out Even though I am running as local I see it being scheduled and managed by yarn. On Mon, Aug 10, 2015 at 12:56 PM, Tathagata Das t...@databricks.com wrote: Is it receiving any data? If so, then it must be listening. Alternatively, to test these theories, you can locally running a spark standalone cluster (one node standalone cluster in local machine), and submit your app in client mode on that to see whether you are seeing the process listening on or not. On Mon, Aug 10, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I've verified all the executors and I don't see a process listening on the port. However, the application seem to show as running in the yarn UI On Mon, Aug 10, 2015 at 11:56 AM, Tathagata Das t...@databricks.com wrote: In yarn-client mode, the driver is on the machine where you ran the spark-submit. The executors are running in the YARN cluster nodes, and the socket receiver listening on port is running in one of the executors. On Mon, Aug 10, 2015 at 11:43 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am running as a yarn-client which probably means that the program that submitted the job is where the listening is also occurring? I thought that the yarn is only used to negotiate resources in yarn-client master mode. On Mon, Aug 10, 2015 at 11:34 AM, Tathagata Das t...@databricks.com wrote: If you are running on a cluster, the listening is occurring on one of the executors, not in the driver. On Mon, Aug 10, 2015 at 10:29 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run this program as a yarn-client. The job seems to be submitting successfully however I don't see any process listening on this host on port https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java Active Jobs (2)Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all stages): Succeeded/Total1foreachRDD at JavaRecoverableNetworkWordCount.java:112 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=12015/08/10 13:27:3651 s0/2 0/2 0start at JavaRecoverableNetworkWordCount.java:152 http://ec2-52-25-118-171.us-west-2.compute.amazonaws.com:8088/proxy/application_1438820875993_0007/jobs/job?id=02015/08/10 13:27:3551 s0/2 0/70
Checkpoint Dir Error in Yarn
I am running in yarn-client mode and trying to execute network word count example. When I connect through nc I see the following in spark app logs: Exception in thread main java.lang.AssertionError: assertion failed: The checkpoint directory has not been set. Please use StreamingContext.checkpoint() or SparkContext.checkpoint() to set the checkpoint directory. at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.streaming.dstream.DStream.validate(DStream.scala:183) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:229) at org.apache.spark.streaming.dstream.DStream$$anonfun$validate$10.apply(DStream.scala:229) at scala.collection
SparkR
Does SparkR support all the algorithms that R library supports?
Re: Class incompatible error
I changed the JDK to Oracle but I still get this error. Not sure what it means by Stream class is incompatible with local class. I am using the following build on the server spark-1.2.1-bin-hadoop2.4 15/04/09 15:26:24 ERROR JobScheduler: Error running job streaming job 1428607584000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 66, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) On Wed, Apr 8, 2015 at 3:40 PM, Ted Yu yuzhih...@gmail.com wrote: bq. one is Oracle and the other is OpenJDK I don't have experience with mixed JDK's. Can you try with using single JDK ? Cheers On Wed, Apr 8, 2015 at 3:26 PM, Mohit Anchlia mohitanch...@gmail.com wrote: For the build I am using java version 1.7.0_65 which seems to be the same as the one on the spark host. However one is Oracle and the other is OpenJDK. Does that make any difference? On Wed, Apr 8, 2015 at 1:24 PM, Ted Yu yuzhih...@gmail.com wrote: What version of Java do you use to build ? Cheers On Wed, Apr 8, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
start-slave.sh not starting
I am trying to start the worker by: sbin/start-slave.sh spark://ip-10-241-251-232:7077 In the logs it's complaining about: Master must be a URL of the form spark://hostname:port I also have this in spark-defaults.conf spark.master spark://ip-10-241-251-232:7077 Did I miss anything?
Class incompatible error
I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Re: Class incompatible error
For the build I am using java version 1.7.0_65 which seems to be the same as the one on the spark host. However one is Oracle and the other is OpenJDK. Does that make any difference? On Wed, Apr 8, 2015 at 1:24 PM, Ted Yu yuzhih...@gmail.com wrote: What version of Java do you use to build ? Cheers On Wed, Apr 8, 2015 at 12:43 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am seeing the following, is this because of my maven version? 15/04/08 15:42:22 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-241-251-232.us-west-2.compute.internal): java.io.InvalidClassException: org.apache.spark.Aggregator; local class incompatible: stream classdesc serialVersionUID = 5032037208639381169, local class serialVersionUID = -9085606473104903453 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency
Re: WordCount example
Interesting, I see 0 cores in the UI? - *Cores:* 0 Total, 0 Used On Fri, Apr 3, 2015 at 2:55 PM, Tathagata Das t...@databricks.com wrote: What does the Spark Standalone UI at port 8080 say about number of cores? On Fri, Apr 3, 2015 at 2:53 PM, Mohit Anchlia mohitanch...@gmail.com wrote: [ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process processor : 0 processor : 1 processor : 2 processor : 3 processor : 4 processor : 5 processor : 6 processor : 7 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote: How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend
Re: WordCount example
If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms 15/03/27 13:50:48 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms 15/03/27 13:50:48
Re: WordCount example
[ec2-user@ip-10-241-251-232 s_lib]$ cat /proc/cpuinfo |grep process processor : 0 processor : 1 processor : 2 processor : 3 processor : 4 processor : 5 processor : 6 processor : 7 On Fri, Apr 3, 2015 at 2:33 PM, Tathagata Das t...@databricks.com wrote: How many cores are present in the works allocated to the standalone cluster spark://ip-10-241-251-232:7077 ? On Fri, Apr 3, 2015 at 2:18 PM, Mohit Anchlia mohitanch...@gmail.com wrote: If I use local[2] instead of *URL:* spark://ip-10-241-251-232:7077 this seems to work. I don't understand why though because when I give spark://ip-10-241-251-232:7077 application seem to bootstrap successfully, just doesn't create a socket on port ? On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO
Re: WordCount example
I tried to file a bug in git repo however I don't see a link to open issues On Fri, Mar 27, 2015 at 10:55 AM, Mohit Anchlia mohitanch...@gmail.com wrote: I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002 NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal :60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms 15/03/27 13:50:48 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms 15
Re: WordCount example
INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 15/03/27 13:50:50 INFO JobScheduler: Added jobs for time 142747865 ms 15/03/27 13:50:51 INFO JobScheduler: Added jobs for time 1427478651000 ms 15/03/27 13:50:52 INFO JobScheduler: Added jobs for time 1427478652000 ms 15/03/27 13:50:53 IN On Thu, Mar 26, 2015 at 6:50 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi, Did you run the word count example in Spark local mode or other mode, in local mode you have to set Local[n], where n =2. For other mode, make sure available cores larger than 1. Because the receiver inside Spark Streaming wraps as a long-running task, which will at least occupy one core. Besides using lsof -p pid or netstat to make sure Spark executor backend is connected to the nc process. Also grep the executor's log to see if there's log like Connecting to host port and Connected to host port which shows that receiver is correctly connected to nc process. Thanks Jerry 2015-03-27 8:45 GMT+08:00 Mohit Anchlia mohitanch...@gmail.com: What's the best way to troubleshoot inside spark to see why Spark is not connecting to nc on port ? I don't see any errors either. On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run the word count example but for some reason it's not working as expected. I start nc server on port and then submit the spark job to the cluster. Spark job gets successfully submitting but I never see any connection from spark getting established. I also tried to type words on the console where nc is listening and waiting on the prompt, however I don't see any output. I also don't see any errors. Here is the conf: SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations.*seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream( localhost, );
Re: WordCount example
What's the best way to troubleshoot inside spark to see why Spark is not connecting to nc on port ? I don't see any errors either. On Thu, Mar 26, 2015 at 2:38 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to run the word count example but for some reason it's not working as expected. I start nc server on port and then submit the spark job to the cluster. Spark job gets successfully submitting but I never see any connection from spark getting established. I also tried to type words on the console where nc is listening and waiting on the prompt, however I don't see any output. I also don't see any errors. Here is the conf: SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, );
WordCount example
I am trying to run the word count example but for some reason it's not working as expected. I start nc server on port and then submit the spark job to the cluster. Spark job gets successfully submitting but I never see any connection from spark getting established. I also tried to type words on the console where nc is listening and waiting on the prompt, however I don't see any output. I also don't see any errors. Here is the conf: SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, );
akka.version error
I am facing the same issue as listed here: http://apache-spark-user-list.1001560.n3.nabble.com/Packaging-a-spark-job-using-maven-td5615.html Solution mentioned is here: https://gist.github.com/prb/d776a47bd164f704eecb However, I think I don't understand few things: 1) Why are jars being split into worker and driver? 2) Does it mean I now need to create 2 jars? 3) I am assuming I still need both jars in the path when I run this job? I am simply trying to execute a basic word count example.
Re: Spark streaming alerting
I think I didn't explain myself properly :) What I meant to say was that generally spark worker runs on either on HDFS's data nodes or on Cassandra nodes, which typically is in a private network (protected). When a condition is matched it's difficult to send out the alerts directly from the worker nodes because of the security concerns. I was wondering if there is a way to listen on the events as they occur on the sliding window scale or is the best way to accomplish is to post back to a queue? On Mon, Mar 23, 2015 at 2:22 AM, Khanderao Kand Gmail khanderao.k...@gmail.com wrote: Akhil You are right in tour answer to what Mohit wrote. However what Mohit seems to be alluring but did not write properly might be different. Mohit You are wrong in saying generally streaming works in HDFS and cassandra . Streaming typically works with streaming or queing source like Kafka, kinesis, Twitter, flume, zeroMQ, etc (but can also from HDFS and S3 ) However , streaming context ( receiver wishing the streaming context ) gets events/messages/records and forms a time window based batch (RDD)- So there is a maximum gap of window time from alert message was available to spark and when the processing happens. I think you meant about this. As per spark programming model, RDD is the right way to deal with data. If you are fine with the minimum delay of say a sec (based on min time window that dstreaming can support) then what Rohit gave is a right model. Khanderao On Mar 22, 2015, at 11:39 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Re: Load balancing
posting my question again :) Thanks for the pointer, looking at the below description from the site it looks like in spark block size is not fixed, it's determined by block interval and in fact for the same batch you could have different block sizes. Did I get it right? - Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter http://spark.apache.org/docs/1.2.1/configuration.html#spark-streaming spark.streaming.blockInterval. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process those the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. -- Also, I am not clear about the data flow of the receiver. When client gets handle to a spark context and calls something like val lines = ssc.socketTextStream(localhost, ), is this the point when spark master is contacted to determine which spark worker node the data is going to go to? On Sun, Mar 22, 2015 at 2:10 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Mohit, please make sure you use the Reply to all button and include the mailing list, otherwise only I will get your message ;) Regarding your question: Yes, that's also my understanding. You can partition streaming RDDs only by time intervals, not by size. So depending on your incoming rate, they may vary. I do not know exactly what the life cycle of the receiver is, but I don't think sth actually happens when you create the DStream. My guess would be that the receiver is allocated when you call StreamingContext#startStreams(), Regards, Jeff 2015-03-21 21:19 GMT+01:00 Mohit Anchlia mohitanch...@gmail.com: Could somebody help me understand the question I posted earlier? On Fri, Mar 20, 2015 at 9:44 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Thanks for the pointer, looking at the below description from the site it looks like in spark block size is not fixed, it's determined by block interval and in fact for the same batch you could have different block sizes. Did I get it right? - Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter http://spark.apache.org/docs/1.2.1/configuration.html#spark-streaming spark.streaming.blockInterval. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process those the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. -- Also, I am not clear about the data flow of the receiver. When client gets handle to a spark context and calls something like val lines = ssc .socketTextStream(localhost, ), is this the point when spark master is contacted to determine which spark worker node the data is going to go to? On Fri, Mar 20, 2015 at 1:33 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Mohit, it also depends on what the source for your streaming application is. If you use Kafka, you can easily partition topics and have multiple receivers on different machines. If you have sth like a HTTP, socket, etc stream, you probably can't do that. The Spark RDDs generated by your receiver will be partitioned and processed in a distributed manner like usual Spark RDDs however. There are parameters to control that behavior (e.g. defaultParallelism and blockInterval). See here for more details: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Regards, Jeff 2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: 1. If you are consuming
Spark streaming alerting
Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Load balancing
I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Unable to connect
I am running spark streaming standalone in ec2 and I am trying to run wordcount example from my desktop. The program is unable to connect to the master, in the logs I see, which seems to be an issue with hostname. 15/03/13 17:37:44 ERROR EndpointWriter: dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp:// sparkMaster@54.69.22.4:7077/]] arriving at [akka.tcp:// sparkMaster@54.69.22.4:7077] inbound addresses are [akka.tcp://sparkMaster@ip-10-241-251-232:7077]
Re: Partitioning
I still don't follow how spark is partitioning data in multi node environment. Is there a document on how spark does portioning of data. For eg: in word count eg how is spark distributing words to multiple nodes? On Fri, Mar 13, 2015 at 3:01 PM, Tathagata Das t...@databricks.com wrote: If you want to access the keys in an RDD that is partition by key, then you can use RDD.mapPartition(), which gives you access to the whole partition as an iteratorkey, value. You have the option of maintaing the partitioning information or not by setting the preservePartitioning flag in mapPartition (see docs). But use it at your own risk. If you modify the keys, and yet preserve partitioning, the partitioning would not make sense any more as the hash of the keys have changed. TD On Fri, Mar 13, 2015 at 2:26 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to look for a documentation on partitioning, which I can't seem to find. I am looking at spark streaming and was wondering how does it partition RDD in a multi node environment. Where are the keys defined that is used for partitioning? For instance in below example keys seem to be implicit: Which one is key and which one is value? Or is it called a flatMap because there are no keys? // Split each line into words JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); And are Keys available inside of Function2 in case it's required for a given use case ? JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey( new Function2Integer, Integer, Integer() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } });
Compilation error
I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example *private* *void* map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( *new* *FlatMapFunctionString, String()* { @Override *public* IterableString call(String x) { *return* Arrays.*asList*(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.*map*( *new* *PairFunctionString, String, Integer()* { @Override *public* *Tuple2*String, Integer call(String s) *throws* Exception { *return* *new* *Tuple2*String, Integer(s, 1); } }); }
Re: Compilation error
It works after sync, thanks for the pointers On Tue, Mar 10, 2015 at 1:22 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I navigated to maven dependency and found scala library. I also found Tuple2.class and when I click on it in eclipse I get invalid LOC header (bad signature) java.util.zip.ZipException: invalid LOC header (bad signature) at java.util.zip.ZipFile.read(Native Method) I am wondering if I should delete that file from local repo and re-sync On Tue, Mar 10, 2015 at 1:08 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I ran the dependency command and see the following dependencies: I only see org.scala-lang. [INFO] org.spark.test:spak-test:jar:0.0.1-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.2.0:compile [INFO] | +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:co mpile [INFO] | | +- org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compil e [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile [INFO] | +- org.scala-lang:scala-library:jar:2.10.4:compile [INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.2.1:compile [INFO] +- com.twitter:chill_2.10:jar:0.5.0:compile [INFO] | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile [INFO] | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:co mpile [INFO] | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile [INFO] | \- org.objenesis:objenesis:jar:1.2:compile [INFO] +- com.twitter:chill-java:jar:0.5.0:compile [INFO] +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile [INFO] | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile [INFO] | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | +- org.apache.commons:commons-math:jar:2.1:compile [INFO] | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | +- commons-io:commons-io:jar:2.1:compile [INFO] | | +- commons-logging:commons-logging:jar:1.1.1:compile [INFO] | | +- commons-lang:commons-lang:jar:2.5:compile [INFO] | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile [INFO] | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile [INFO] | | +- org.apache.avro:avro:jar:1.7.4:compile [INFO] | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile [INFO] | | \- org.apache.commons:commons-compress:jar:1.4.1:compile [INFO] | | \- org.tukaani:xz:jar:1.0:compile [INFO] | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile [INFO] | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile [INFO] | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile [INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:co mpile [INFO] | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile [INFO] | | | | +- com.google.inject:guice:jar:3.0:compile [INFO] | | | | | +- javax.inject:javax.inject:jar:1:compile [INFO] | | | | | \- aopalliance:aopalliance:jar:1.0:compile [INFO] | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framew ork-grizzly2:jar:1.9:compile [INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-fra mework-core:jar:1.9:compile [INFO] | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile [INFO] | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile [INFO] | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http:jar:2.1.2:comp ile [INFO] | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2. 1.2:compile [INFO] | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0. 0-b023:compile [INFO] | | | | | | \- org.glassfish.external:management-api:ja r:3.0.0-b012:compile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1 .2:compile [INFO] | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:co mpile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2. 1.2:compile [INFO] | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile [INFO] | | | | +- com.sun.jersey:jersey-server:jar:1.9:compile [INFO] | | | | | +- asm:asm:jar:3.1:compile [INFO] | | | | | \- com.sun.jersey:jersey-core:jar:1.9:compile [INFO] | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile [INFO] | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile [INFO] | | | | | | \- stax:stax-api:jar:1.0.1:compile [INFO
Architecture Documentation
Is there a good architecture doc that gives a sufficient overview of high level and low level details of spark with some good diagrams?
Re: Compilation error
How do I do that? I haven't used Scala before. Also, linking page doesn't mention that: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote: It means you do not have Scala library classes in your project classpath. On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example private void map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.map( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) throws Exception { return new Tuple2String, Integer(s, 1); } }); }
Compilation error on JavaPairDStream
I am getting following error. When I look at the sources it seems to be a scala source, but not sure why it's complaining about it. The method map(FunctionString,R) in the type JavaDStreamString is not applicable for the arguments (new PairFunctionString,String,Integer(){}) And my code has been taken from the spark examples site: JavaPairDStreamString, Integer pairs = words.*map*( *new* *PairFunctionString, String, Integer()* { @Override *public* Tuple2String, Integer call(String s) *throws* Exception { *return* *new* Tuple2String, Integer(s, 1); } });
Re: Compilation error
I navigated to maven dependency and found scala library. I also found Tuple2.class and when I click on it in eclipse I get invalid LOC header (bad signature) java.util.zip.ZipException: invalid LOC header (bad signature) at java.util.zip.ZipFile.read(Native Method) I am wondering if I should delete that file from local repo and re-sync On Tue, Mar 10, 2015 at 1:08 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I ran the dependency command and see the following dependencies: I only see org.scala-lang. [INFO] org.spark.test:spak-test:jar:0.0.1-SNAPSHOT [INFO] +- org.apache.spark:spark-streaming_2.10:jar:1.2.0:compile [INFO] | +- org.eclipse.jetty:jetty-server:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:co mpile [INFO] | | +- org.eclipse.jetty:jetty-continuation:jar:8.1.14.v20131031:compil e [INFO] | | \- org.eclipse.jetty:jetty-http:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-io:jar:8.1.14.v20131031:compile [INFO] | +- org.scala-lang:scala-library:jar:2.10.4:compile [INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile [INFO] \- org.apache.spark:spark-core_2.10:jar:1.2.1:compile [INFO] +- com.twitter:chill_2.10:jar:0.5.0:compile [INFO] | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile [INFO] | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:co mpile [INFO] | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile [INFO] | \- org.objenesis:objenesis:jar:1.2:compile [INFO] +- com.twitter:chill-java:jar:0.5.0:compile [INFO] +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile [INFO] | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile [INFO] | | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | | +- org.apache.commons:commons-math:jar:2.1:compile [INFO] | | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | | +- commons-io:commons-io:jar:2.1:compile [INFO] | | +- commons-logging:commons-logging:jar:1.1.1:compile [INFO] | | +- commons-lang:commons-lang:jar:2.5:compile [INFO] | | +- commons-configuration:commons-configuration:jar:1.6:compile [INFO] | | | +- commons-collections:commons-collections:jar:3.2.1:compile [INFO] | | | +- commons-digester:commons-digester:jar:1.8:compile [INFO] | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile [INFO] | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile [INFO] | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile [INFO] | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.8.8:compile [INFO] | | +- org.apache.avro:avro:jar:1.7.4:compile [INFO] | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile [INFO] | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile [INFO] | | \- org.apache.commons:commons-compress:jar:1.4.1:compile [INFO] | | \- org.tukaani:xz:jar:1.0:compile [INFO] | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile [INFO] | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile [INFO] | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile [INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:co mpile [INFO] | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile [INFO] | | | | +- com.google.inject:guice:jar:3.0:compile [INFO] | | | | | +- javax.inject:javax.inject:jar:1:compile [INFO] | | | | | \- aopalliance:aopalliance:jar:1.0:compile [INFO] | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framew ork-grizzly2:jar:1.9:compile [INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-fra mework-core:jar:1.9:compile [INFO] | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile [INFO] | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile [INFO] | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http:jar:2.1.2:comp ile [INFO] | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2. 1.2:compile [INFO] | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0. 0-b023:compile [INFO] | | | | | | \- org.glassfish.external:management-api:ja r:3.0.0-b012:compile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1 .2:compile [INFO] | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:co mpile [INFO] | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2. 1.2:compile [INFO] | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile [INFO] | | | | +- com.sun.jersey:jersey-server:jar:1.9:compile [INFO] | | | | | +- asm:asm:jar:3.1:compile [INFO] | | | | | \- com.sun.jersey:jersey-core:jar:1.9:compile [INFO] | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile [INFO] | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile [INFO] | | | | | | \- stax:stax-api:jar:1.0.1:compile [INFO] | | | | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile [INFO] | | | | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile [INFO
Re: Compilation error
I am using maven and my dependency looks like this, but this doesn't seem to be working dependencies dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.1/version /dependency /dependencies On Tue, Mar 10, 2015 at 11:06 AM, Tathagata Das t...@databricks.com wrote: If you are using tools like SBT/Maven/Gradle/etc, they figure out all the recursive dependencies and includes them in the class path. I haven't touched Eclipse in years so I am not sure off the top of my head what's going on instead. Just in case you only downloaded the spark-streaming_2.10.jar then that is indeed insufficient and you have to download all the recursive dependencies. May be you should create a Maven project inside Eclipse? TD On Tue, Mar 10, 2015 at 11:00 AM, Mohit Anchlia mohitanch...@gmail.com wrote: How do I do that? I haven't used Scala before. Also, linking page doesn't mention that: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#linking On Tue, Mar 10, 2015 at 10:57 AM, Sean Owen so...@cloudera.com wrote: It means you do not have Scala library classes in your project classpath. On Tue, Mar 10, 2015 at 5:54 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying out streaming example as documented and I am using spark 1.2.1 streaming from maven for Java. When I add this code I get compilation error on and eclipse is not able to recognize Tuple2. I also don't see any import scala.Tuple2 class. http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#a-quick-example private void map(JavaReceiverInputDStreamString lines) { JavaDStreamString words = lines.flatMap( new FlatMapFunctionString, String() { @Override public IterableString call(String x) { return Arrays.asList(x.split( )); } }); // Count each word in each batch JavaPairDStreamString, Integer pairs = words.map( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) throws Exception { return new Tuple2String, Integer(s, 1); } }); }
Re: Compilation error
:compile [INFO] | +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile [INFO] | | \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile [INFO] | +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:co mpile [INFO] | \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile [INFO] +- org.apache.spark:spark-network-common_2.10:jar:1.2.1:compile [INFO] +- org.apache.spark:spark-network-shuffle_2.10:jar:1.2.1:compile [INFO] +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile [INFO] | +- commons-codec:commons-codec:jar:1.3:compile [INFO] | \- commons-httpclient:commons-httpclient:jar:3.1:compile [INFO] +- org.apache.curator:curator-recipes:jar:2.4.0:compile [INFO] | +- org.apache.curator:curator-framework:jar:2.4.0:compile [INFO] | | \- org.apache.curator:curator-client:jar:2.4.0:compile [INFO] | +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile [INFO] | | \- jline:jline:jar:0.9.94:compile [INFO] | \- com.google.guava:guava:jar:14.0.1:compile [INFO] +- org.eclipse.jetty:jetty-plus:jar:8.1.14.v20131031:compile [INFO] | +- org.eclipse.jetty.orbit:javax.transaction:jar:1.1.1.v20110521064 5:compile [INFO] | +- org.eclipse.jetty:jetty-webapp:jar:8.1.14.v20131031:compile [INFO] | | +- org.eclipse.jetty:jetty-xml:jar:8.1.14.v20131031:compile [INFO] | | \- org.eclipse.jetty:jetty-servlet:jar:8.1.14.v20131031:compile [INFO] | \- org.eclipse.jetty:jetty-jndi:jar:8.1.14.v20131031:compile [INFO] | \- org.eclipse.jetty.orbit:javax.mail.glassfish:jar:1.4.1.v20100 5082020:compile [INFO] | \- org.eclipse.jetty.orbit:javax.activation:jar:1.1.0.v201105 071233:compile [INFO] +- org.eclipse.jetty:jetty-security:jar:8.1.14.v20131031:compile [INFO] +- org.eclipse.jetty:jetty-util:jar:8.1.14.v20131031:compile [INFO] +- org.apache.commons:commons-lang3:jar:3.3.2:compile [INFO] +- org.apache.commons:commons-math3:jar:3.1.1:compile [INFO] +- com.google.code.findbugs:jsr305:jar:1.3.9:compile [INFO] +- org.slf4j:slf4j-api:jar:1.7.5:compile [INFO] +- org.slf4j:jul-to-slf4j:jar:1.7.5:compile [INFO] +- org.slf4j:jcl-over-slf4j:jar:1.7.5:compile [INFO] +- log4j:log4j:jar:1.2.17:compile [INFO] +- org.slf4j:slf4j-log4j12:jar:1.7.5:compile [INFO] +- com.ning:compress-lzf:jar:1.0.0:compile [INFO] +- org.xerial.snappy:snappy-java:jar:1.1.1.6:compile [INFO] +- net.jpountz.lz4:lz4:jar:1.2.0:compile [INFO] +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile [INFO] +- commons-net:commons-net:jar:2.2:compile [INFO] +- org.spark-project.akka:akka-remote_2.10:jar:2.3.4-spark:compile [INFO] | +- org.spark-project.akka:akka-actor_2.10:jar:2.3.4-spark:compile [INFO] | | \- com.typesafe:config:jar:1.2.1:compile [INFO] | +- io.netty:netty:jar:3.8.0.Final:compile [INFO] | +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile [INFO] | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile [INFO] +- org.spark-project.akka:akka-slf4j_2.10:jar:2.3.4-spark:compile [INFO] +- org.json4s:json4s-jackson_2.10:jar:3.2.10:compile [INFO] | +- org.json4s:json4s-core_2.10:jar:3.2.10:compile [INFO] | | +- org.json4s:json4s-ast_2.10:jar:3.2.10:compile [INFO] | | +- com.thoughtworks.paranamer:paranamer:jar:2.6:compile [INFO] | | \- org.scala-lang:scalap:jar:2.10.0:compile [INFO] | | \- org.scala-lang:scala-compiler:jar:2.10.0:compile [INFO] | | \- org.scala-lang:scala-reflect:jar:2.10.0:compile [INFO] | \- com.fasterxml.jackson.core:jackson-databind:jar:2.3.1:compile [INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.3.0:comp ile [INFO] | \- com.fasterxml.jackson.core:jackson-core:jar:2.3.1:compile [INFO] +- org.apache.mesos:mesos:jar:shaded-protobuf:0.18.1:compile [INFO] +- io.netty:netty-all:jar:4.0.23.Final:compile [INFO] +- com.clearspring.analytics:stream:jar:2.7.0:compile [INFO] +- com.codahale.metrics:metrics-core:jar:3.0.0:compile [INFO] +- com.codahale.metrics:metrics-jvm:jar:3.0.0:compile [INFO] +- com.codahale.metrics:metrics-json:jar:3.0.0:compile [INFO] +- com.codahale.metrics:metrics-graphite:jar:3.0.0:compile [INFO] +- org.tachyonproject:tachyon-client:jar:0.5.0:compile [INFO] | \- org.tachyonproject:tachyon:jar:0.5.0:compile [INFO] +- org.spark-project:pyrolite:jar:2.0.1:compile [INFO] \- net.sf.py4j:py4j:jar:0.8.2.1:compile On Tue, Mar 10, 2015 at 11:40 AM, Sean Owen so...@cloudera.com wrote: A couple points: You've got mismatched versions here -- 1.2.0 vs 1.2.1. You should fix that but it's not your problem. These are also supposed to be 'provided' scope dependencies in Maven. You should get the Scala deps transitively and can import scala.* classes. However, it would be a little bit more correct to depend directly on the scala library classes, but in practice, easiest not to in simple use cases. If you're still having trouble look at the output of mvn dependency:tree On Tue, Mar 10, 2015 at 6:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am using maven and my dependency looks like this, but this doesn't seem
Hadoop Map vs Spark stream Map
Hi, I am trying to understand Hadoop Map method compared to spark Map and I noticed that spark Map only receives 3 arguments 1) input value 2) output key 3) output value, however in hadoop map it has 4 values 1) input key 2) input value 3) output key 4) output value. Is there any reason it was designed this way? Just trying to undersand: Hadoop: public void map(K key, V val, OutputCollectorK, V output, Reporter reporter) // Count each word in each batch JavaPairDStreamString, Integer *pairs* = words.mapToPair( *new* *PairFunctionString, String, Integer()* { @Override *public* Tuple2String, Integer call(String s) *throws* Exception { *return* *new* Tuple2String, Integer(s, 1); } });
Re: Compilation error on JavaPairDStream
works now. I should have checked :) On Tue, Mar 10, 2015 at 1:44 PM, Sean Owen so...@cloudera.com wrote: Ah, that's a typo in the example: use words.mapToPair I can make a little PR to fix that. On Tue, Mar 10, 2015 at 8:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am getting following error. When I look at the sources it seems to be a scala source, but not sure why it's complaining about it. The method map(FunctionString,R) in the type JavaDStreamString is not applicable for the arguments (new PairFunctionString,String,Integer(){}) And my code has been taken from the spark examples site: JavaPairDStreamString, Integer pairs = words.map( new PairFunctionString, String, Integer() { @Override public Tuple2String, Integer call(String s) throws Exception { return new Tuple2String, Integer(s, 1); } });
SQL with Spark Streaming
Does Spark Streaming also supports SQLs? Something like how Esper does CEP.