Support for Data flow graphs and not DAG only
Hey , I didn't find any documentation regarding support for cycles in spark topology , although storm supports this using manual configuration in acker function logic (setting it to a particular count) .By cycles i doesn't mean infinite loops . -- Thanks Regards, Anshu Shukla
spark log analyzer sample
Exception in thread main java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 I am not using any hadoop facility (not even hdfs) then why it is giving this error . -- Thanks Regards, Anshu Shukla
Fwd: Event generator for SPARK-Streaming from csv
I have the real DEBS-TAxi data in csv file , in order to operate over it how to simulate a Spout kind of thing as event generator using the timestamps in CSV file. -- Thanks Regards, Anshu Shukla
Event generator for SPARK-Streaming from csv
I have the real DEBS-TAxi data in csv file , in order to operate over it how to simulate a Spout kind of thing as event generator using the timestamps in CSV file. -- SERC-IISC Thanks Regards, Anshu Shukla
Re: Creating topology in spark streaming
Thanks alot Juan, That was a great post, One more thing if u can .Any there any demo/blog telling how to configure or create a topology of different types .. i mean how we can decide the pipelining model in spark as done in storm for https://storm.apache.org/documentation/images/topology.png . On Wed, May 6, 2015 at 2:47 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, You can use the method repartition from DStream (for the Scala API) or JavaDStream (for the Java API) defrepartition(numPartitions: Int): DStream https://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/dstream/DStream.html [T] Return a new DStream with an increased or decreased level of parallelism. Each RDD in the returned DStream has exactly numPartitions partitions. I think the post http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ on integration of Spark Streaming gives very interesting review on the subject, although the integration with Kafka it's not up to date with https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html Hope that helps. Greetings, Juan 2015-05-06 10:32 GMT+02:00 anshu shukla anshushuk...@gmail.com: But main problem is how to increase the level of parallelism for any particular bolt logic . suppose i want this type of topology . https://storm.apache.org/documentation/images/topology.png How we can manage it . On Wed, May 6, 2015 at 1:36 PM, ayan guha guha.a...@gmail.com wrote: Every transformation on a dstream will create another dstream. You may want to take a look at foreachrdd? Also, kindly share your code so people can help better On 6 May 2015 17:54, anshu shukla anshushuk...@gmail.com wrote: Please help guys, Even After going through all the examples given i have not understood how to pass the D-streams from one bolt/logic to other (without writing it on HDFS etc.) just like emit function in storm . Suppose i have topology with 3 bolts(say) *BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql database using spark SQL)* Now since Sentiment analysis will take most of the time ,we have to increase its level of parallelism for tuning latency. Howe to increase the levele of parallelism since the logic of topology is not clear . -- Thanks Regards, Anshu Shukla Indian Institute of Sciences -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Creating topology in spark streaming
But main problem is how to increase the level of parallelism for any particular bolt logic . suppose i want this type of topology . https://storm.apache.org/documentation/images/topology.png How we can manage it . On Wed, May 6, 2015 at 1:36 PM, ayan guha guha.a...@gmail.com wrote: Every transformation on a dstream will create another dstream. You may want to take a look at foreachrdd? Also, kindly share your code so people can help better On 6 May 2015 17:54, anshu shukla anshushuk...@gmail.com wrote: Please help guys, Even After going through all the examples given i have not understood how to pass the D-streams from one bolt/logic to other (without writing it on HDFS etc.) just like emit function in storm . Suppose i have topology with 3 bolts(say) *BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql database using spark SQL)* Now since Sentiment analysis will take most of the time ,we have to increase its level of parallelism for tuning latency. Howe to increase the levele of parallelism since the logic of topology is not clear . -- Thanks Regards, Anshu Shukla Indian Institute of Sciences -- Thanks Regards, Anshu Shukla
Creating topology in spark streaming
Please help guys, Even After going through all the examples given i have not understood how to pass the D-streams from one bolt/logic to other (without writing it on HDFS etc.) just like emit function in storm . Suppose i have topology with 3 bolts(say) *BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql database using spark SQL)* Now since Sentiment analysis will take most of the time ,we have to increase its level of parallelism for tuning latency. Howe to increase the levele of parallelism since the logic of topology is not clear . -- Thanks Regards, Anshu Shukla Indian Institute of Sciences
Predict.scala using model for clustering In reference
Can anyone please explain - println(Initalizaing the the KMeans model...) val model = new KMeansModel(ssc.sparkContext.objectFile[Vector](modelFile.toString).collect()) where modelfile is *directory to persist the model while training * REF- https://github.com/databricks/reference-apps/blob/master/twitter_classifier/predict.md -- Thanks Regards, Anshu Shukla
Re: Map one RDD into two RDD
One of the best discussion in mailing list :-) ...Please help me in concluding -- The whole discussion concludes that - 1- Framework does not support increasing parallelism of any task just by any inbuilt function . 2- User have to manualy write logic for filter output of upstream node in DAG to manage input to Downstream nodes (like shuffle grouping etc in STORM) 3- If we want to increase the level of parallelism of twitter streaming Spout to *get higher rate of DStream of tweets (to increase the rate of input ) , how it is possible ... * *val tweetStream = **TwitterUtils.createStream(ssc, Utils.getAuth)* On Fri, May 8, 2015 at 2:16 AM, Evo Eftimov evo.efti...@isecc.com wrote: 1. Will rdd2.filter run before rdd1.filter finish? YES 2. We have to traverse rdd twice. Any comments? You can invoke filter or whatever other transformation / function many times Ps: you have to study / learn the Parallel Programming Model of an OO Framework like Spark – in any OO Framework lots of Behavior is hidden / encapsulated by the Framework and the client code gets invoked at specific points in the Flow of Control / Data based on callback functions That’s why stuff like RDD.filter(), RDD.filter() may look “sequential” to you but it is not *From:* Bill Q [mailto:bill.q@gmail.com] *Sent:* Thursday, May 7, 2015 6:27 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Map one RDD into two RDD The multi-threading code in Scala is quite simple and you can google it pretty easily. We used the Future framework. You can use Akka also. @Evo My concerns for filtering solution are: 1. Will rdd2.filter run before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments? On Thursday, May 7, 2015, Evo Eftimov evo.efti...@isecc.com wrote: Scala is a language, Spark is an OO/Functional, Distributed Framework facilitating Parallel Programming in a distributed environment Any “Scala parallelism” occurs within the Parallel Model imposed by the Spark OO Framework – ie it is limited in terms of what it can achieve in terms of influencing the Spark Framework behavior – that is the nature of programming with/for frameworks When RDD1 and RDD2 are partitioned and different Actions applied to them this will result in Parallel Pipelines / DAGs within the Spark Framework RDD1 = RDD.filter() RDD2 = RDD.filter() *From:* Bill Q [mailto:bill.q@gmail.com] *Sent:* Thursday, May 7, 2015 4:55 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Map one RDD into two RDD Thanks for the replies. We decided to use concurrency in Scala to do the two mappings using the same source RDD in parallel. So far, it seems to be working. Any comments? On Wednesday, May 6, 2015, Evo Eftimov evo.efti...@isecc.com wrote: RDD1 = RDD.filter() RDD2 = RDD.filter() *From:* Bill Q [mailto:bill.q@gmail.com bill.q@gmail.com] *Sent:* Tuesday, May 5, 2015 10:42 PM *To:* user@spark.apache.org *Subject:* Map one RDD into two RDD Hi all, I have a large RDD that I map a function to it. Based on the nature of each record in the input RDD, I will generate two types of data. I would like to save each type into its own RDD. But I can't seem to find an efficient way to do it. Any suggestions? Many thanks. Bill -- Many thanks. Bill -- Many thanks. Bill -- Many thanks. Bill -- Thanks Regards, Anshu Shukla
[no subject]
Exception with sample testing in Intellij IDE: Exception in thread main java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at akka.util.Collections$EmptyImmutableSeq$.init(Collections.scala:15) at akka.util.Collections$EmptyImmutableSeq$.clinit(Collections.scala) at akka.japi.Util$.immutableSeq(JavaAPI.scala:229) at akka.remote.RemoteSettings.init(RemoteSettings.scala:30) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:114) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) at scala.util.Try$.apply(Try.scala:191) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:230) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:584) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:577) at akka.actor.ActorSystem$.apply(ActorSystem.scala:141) at akka.actor.ActorSystem$.apply(ActorSystem.scala:118) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163) at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:269) at org.apache.spark.SparkContext.init(SparkContext.scala:272) *at Testspark$.main(Testspark.scala:17)* at Testspark.main(Testspark.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 38 more *Code is Testspark.scala-* /** * Created by anshushukla on 07/05/15. */ import org.apache.spark.{SparkConf, SparkContext} object Testspark { def main (args: Array[String]) { val conf=new SparkConf() .setMaster(local[2]) .setAppName(TestSpark) val sc=new SparkContext(conf)//line number 17 showing exception val data=sc.parallelize(1 to 10).collect().filter(_1000) data.foreach(println) } } *build.sbt is -* name := scala-test-workspace version := 1.0 scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-streaming_2.10 % 1.3.1 -- Thanks Regards, Anshu Shukla Indian Institute of Science
Re: Event generator for SPARK-Streaming from csv
I know these methods , but i need to create events using the timestamps in the data tuples ,means every time a new tuple is generated using the timestamp in a CSV file .this will be useful to simulate the data rate with time just like real sensor data . On Fri, May 1, 2015 at 2:52 PM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, Maybe you could use streamingContext.fileStream like in the example from https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers, you can read from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.). You could split the file into several smaller files, and move them to the target folder one by one with some sleep time in between to simulate a stream of data with custom granularity. Hope that helps, Greetings, Juan 2015-05-01 9:30 GMT+02:00 anshu shukla anshushuk...@gmail.com: I have the real DEBS-TAxi data in csv file , in order to operate over it how to simulate a Spout kind of thing as event generator using the timestamps in CSV file. -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Map one RDD into two RDD
Any update to above mail and Can anyone tell me logic - I have to filter tweets and submit tweets with particular #hashtag1 to SparkSQL databases and tweets with #hashtag2 will be passed to sentiment analysis phase .Problem is how to split the input data in two streams using hashtags On Fri, May 8, 2015 at 2:42 AM, anshu shukla anshushuk...@gmail.com wrote: One of the best discussion in mailing list :-) ...Please help me in concluding -- The whole discussion concludes that - 1- Framework does not support increasing parallelism of any task just by any inbuilt function . 2- User have to manualy write logic for filter output of upstream node in DAG to manage input to Downstream nodes (like shuffle grouping etc in STORM) 3- If we want to increase the level of parallelism of twitter streaming Spout to *get higher rate of DStream of tweets (to increase the rate of input ) , how it is possible ... * *val tweetStream = **TwitterUtils.createStream(ssc, Utils.getAuth)* On Fri, May 8, 2015 at 2:16 AM, Evo Eftimov evo.efti...@isecc.com wrote: 1. Will rdd2.filter run before rdd1.filter finish? YES 2. We have to traverse rdd twice. Any comments? You can invoke filter or whatever other transformation / function many times Ps: you have to study / learn the Parallel Programming Model of an OO Framework like Spark – in any OO Framework lots of Behavior is hidden / encapsulated by the Framework and the client code gets invoked at specific points in the Flow of Control / Data based on callback functions That’s why stuff like RDD.filter(), RDD.filter() may look “sequential” to you but it is not *From:* Bill Q [mailto:bill.q@gmail.com] *Sent:* Thursday, May 7, 2015 6:27 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Map one RDD into two RDD The multi-threading code in Scala is quite simple and you can google it pretty easily. We used the Future framework. You can use Akka also. @Evo My concerns for filtering solution are: 1. Will rdd2.filter run before rdd1.filter finish? 2. We have to traverse rdd twice. Any comments? On Thursday, May 7, 2015, Evo Eftimov evo.efti...@isecc.com wrote: Scala is a language, Spark is an OO/Functional, Distributed Framework facilitating Parallel Programming in a distributed environment Any “Scala parallelism” occurs within the Parallel Model imposed by the Spark OO Framework – ie it is limited in terms of what it can achieve in terms of influencing the Spark Framework behavior – that is the nature of programming with/for frameworks When RDD1 and RDD2 are partitioned and different Actions applied to them this will result in Parallel Pipelines / DAGs within the Spark Framework RDD1 = RDD.filter() RDD2 = RDD.filter() *From:* Bill Q [mailto:bill.q@gmail.com] *Sent:* Thursday, May 7, 2015 4:55 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Map one RDD into two RDD Thanks for the replies. We decided to use concurrency in Scala to do the two mappings using the same source RDD in parallel. So far, it seems to be working. Any comments? On Wednesday, May 6, 2015, Evo Eftimov evo.efti...@isecc.com wrote: RDD1 = RDD.filter() RDD2 = RDD.filter() *From:* Bill Q [mailto:bill.q@gmail.com bill.q@gmail.com] *Sent:* Tuesday, May 5, 2015 10:42 PM *To:* user@spark.apache.org *Subject:* Map one RDD into two RDD Hi all, I have a large RDD that I map a function to it. Based on the nature of each record in the input RDD, I will generate two types of data. I would like to save each type into its own RDD. But I can't seem to find an efficient way to do it. Any suggestions? Many thanks. Bill -- Many thanks. Bill -- Many thanks. Bill -- Many thanks. Bill -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
EVent generation
http://stackoverflow.com/questions/30149868/generate-events-tuples-using-csv-file-with-timestamps -- Thanks Regards, Anshu Shukla
Re: EVent generation
I dont know how to simulate such type of input for even spark . On Tue, May 12, 2015 at 3:02 PM, Steve Loughran ste...@hortonworks.com wrote: I think you may want to try emailing things to the storm users list, not the spark one On 11 May 2015, at 15:42, Tyler Mitchell tyler.mitch...@actian.com wrote: I've had good success with splunk generator. https://github.com/coccyx/eventgen/blob/master/README.md On May 11, 2015, at 00:05, Akhil Das ak...@sigmoidanalytics.com wrote: Have a look over here https://storm.apache.org/community.html Thanks Best Regards On Sun, May 10, 2015 at 3:21 PM, anshu shukla anshushuk...@gmail.com wrote: http://stackoverflow.com/questions/30149868/generate-events-tuples-using-csv-file-with-timestamps -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Multiple executors writing file using java filewriter
Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Multiple executors writing file using java filewriter
Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla
Re: Assigning number of workers in spark streaming
Thanx alot ! But in client mode Can we assign number of workers/nodes as a flag parameter to the spark-Submit command . And by default how it will distribute the load across the nodes. # Run on a Spark Standalone cluster in client deploy mode ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 On Sat, Jun 20, 2015 at 3:18 AM, Tathagata Das t...@databricks.com wrote: Depends on what cluster manager are you using. Its all pretty well documented in the online documentation. http://spark.apache.org/docs/latest/submitting-applications.html On Fri, Jun 19, 2015 at 2:29 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , *[For Client Mode]* 1- Is there any way to assign the number of workers from a cluster should be used for particular application . 2- If not then how spark scheduler decides scheduling of diif applications inside one full logic . say my logic have {inputStream wordsplitter-wordcountstatistical analysis} then on how many workers it will be scheduled . -- Thanks Regards, Anshu Shukla SERC-IISC -- Thanks Regards, Anshu Shukla
Assigning number of workers in spark streaming
Hey , *[For Client Mode]* 1- Is there any way to assign the number of workers from a cluster should be used for particular application . 2- If not then how spark scheduler decides scheduling of diif applications inside one full logic . say my logic have {inputStream wordsplitter-wordcountstatistical analysis} then on how many workers it will be scheduled . -- Thanks Regards, Anshu Shukla SERC-IISC
Verifying number of workers in Spark Streaming
How to know that In stream Processing over the cluster of 8 machines all the machines/woker nodes are being used (my cluster have 8 slaves ) . -- Thanks Regards, Anshu Shukla
Updation of Static variable inside foreachRDD method
I want to log timestamp of every element of the RDD so i have assigned the MSGid to every elemnt inside RDD,and increamented it.(static variable). My code is giving distinct Msgid in local mode but in cluster mode this value is duplicated every 30-40 count. Please help !! //public static long msgId=0; newinputStream.foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString v1, Time v2) throws Exception { for(String s:v1.collect()) { spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s)); //System.out.println(msgeditor.getMessageId(s)); } return null; } }); -- Thanks Regards, Anshu Shukla
Using Accumulators in Streaming
In spark Streaming ,Since we are already having Streaming context , which does not allows us to have accumulators .We have to get sparkContext for initializing accumulator value . But having 2 spark context will not serve the problem . Please Help !! -- Thanks Regards, Anshu Shukla
Fwd: Verifying number of workers in Spark Streaming
Any suggestions please ..!! How to know that In stream Processing over the cluster of 8 machines all the machines/woker nodes are being used (my cluster have 8 slaves ) . I am submitting job from master itself over the ec-2 cluster crated by the ec-2 scripts available with spark. But i am not able figure out that my job is using all workers or not . -- Thanks Regards, Anshu Shukla SERC-IISC
Re: Using Accumulators in Streaming
But i just want to update rdd , by appending unique message ID with each element of RDD , which should be automatically(m++ ..) updated every time a new element comes to rdd . On Mon, Jun 22, 2015 at 7:05 AM, Michal Čizmazia mici...@gmail.com wrote: StreamingContext.sparkContext() On 21 June 2015 at 21:32, Will Briggs wrbri...@gmail.com wrote: It sounds like accumulators are not necessary in Spark Streaming - see this post ( http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html) for more details. On June 21, 2015, at 7:31 PM, anshu shukla anshushuk...@gmail.com wrote: In spark Streaming ,Since we are already having Streaming context , which does not allows us to have accumulators .We have to get sparkContext for initializing accumulator value . But having 2 spark context will not serve the problem . Please Help !! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Loss of data due to congestion
How spark guarantees that no RDD will fail /lost during its life cycle . Is there something like ask in storm or its does it by default . -- Thanks Regards, Anshu Shukla
Re: Loss of data due to congestion
Thaks, I am talking about streaming. On 25 Jun 2015 5:37 am, ayan guha guha.a...@gmail.com wrote: Can you elaborate little more? Are you talking about receiver or streaming? On 24 Jun 2015 23:18, anshu shukla anshushuk...@gmail.com wrote: How spark guarantees that no RDD will fail /lost during its life cycle . Is there something like ask in storm or its does it by default . -- Thanks Regards, Anshu Shukla
Re: Parsing a tsv file with key value pairs
Can you be more specific Or can you provide sample file . On Thu, Jun 25, 2015 at 11:00 AM, Ravikant Dindokar ravikant.i...@gmail.com wrote: Hi Spark user, I am new to spark so forgive me for asking a basic question. I'm trying to import my tsv file into spark. This file has key and value separated by a \t per line. I want to import this file as dictionary of key value pairs in Spark. I came across this code to do the same for csv file: import csv import StringIO ... def loadRecord(line): Parse a CSV line input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=[name, favouriteAnimal]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) Can you point out the changes required to parse a tsv file? After following operation : split_lines = lines.map(_.split(\t)) what should I do to read the key values in dictionary? Thanks Ravikant -- Thanks Regards, Anshu Shukla
Re: Latency between the RDD in Streaming
How will i can to know that for how much time particular RDD had remained in pipeline . On Fri, Jun 19, 2015 at 7:59 AM, Tathagata Das t...@databricks.com wrote: Why do you need to uniquely identify the message? All you need is the time when the message was inserted by the receiver, and when it is processed, isnt it? On Thu, Jun 18, 2015 at 2:28 PM, anshu shukla anshushuk...@gmail.com wrote: Thanks alot , But i have already tried the second way ,Problem with that is that how to identify the particular RDD from source to sink (as we can do by passing a msg id in storm) . For that i just updated RDD and added a msgID (as static variable) . but while dumping them to file some of the tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500 tuples/sec). On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com wrote: Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Latency between the RDD in Streaming
Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla
Re: Latency between the RDD in Streaming
Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Latency between the RDD in Streaming
Thanks alot , But i have already tried the second way ,Problem with that is that how to identify the particular RDD from source to sink (as we can do by passing a msg id in storm) . For that i just updated RDD and added a msgID (as static variable) . but while dumping them to file some of the tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500 tuples/sec). On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com wrote: Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Multiple executors writing file using java filewriter
Thanx for reply !! YES , Either it should write on any machine of cluster or Can you please help me ... that how to do this . Previously i was using writing using collect () , so some of my tuples are missing while writing. //previous logic that was just creating the file on master - newinputStream.foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString v1, Time v2) throws Exception { for(String s:v1.collect()) { //System.out.println(v1 here is + v1 + --- + s); spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s)); //System.out.println(msgeditor.getMessageId(s)); } return null; } }); On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher rmarsc...@localytics.com wrote: Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Calculating tuple count /input rate with time
I am calculating input rate using the following logic. And i think this foreachRDD is always running on driver (println are seen on driver) 1- Is there any other way to do that in less cost . 2- Will this give me the correct count for rate . //code - inputStream.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { System.out.println(System.currentTimeMillis()+,spoutstringJavaRDD, + stringJavaRDD.count() ); return null; } }); -- Thanks Regards, Anshu Shukla
Re: Multiple executors writing file using java filewriter
Thanks alot , Because i just want to log timestamp and unique message id and not full RDD . On Tue, Jun 23, 2015 at 12:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Why don't you do a normal .saveAsTextFiles? Thanks Best Regards On Mon, Jun 22, 2015 at 11:55 PM, anshu shukla anshushuk...@gmail.com wrote: Thanx for reply !! YES , Either it should write on any machine of cluster or Can you please help me ... that how to do this . Previously i was using writing using collect () , so some of my tuples are missing while writing. //previous logic that was just creating the file on master - newinputStream.foreachRDD(new Function2JavaRDDString, Time, Void() { @Override public Void call(JavaRDDString v1, Time v2) throws Exception { for(String s:v1.collect()) { //System.out.println(v1 here is + v1 + --- + s); spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s)); //System.out.println(msgeditor.getMessageId(s)); } return null; } }); On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher rmarsc...@localytics.com wrote: Is spoutLog just a non-spark file writer? If you run that in the map call on a cluster its going to be writing in the filesystem of the executor its being run on. I'm not sure if that's what you intended. On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla anshushuk...@gmail.com wrote: Running perfectly in local system but not writing to file in cluster mode .ANY suggestions please .. //msgid is long counter JavaDStreamString newinputStream=inputStream.map(new FunctionString, String() { @Override public String call(String v1) throws Exception { String s1=msgId+@+v1; System.out.println(s1); msgId++; try { *//filewriter logic spoutlog.batchLogwriter(System.currentTimeMillis(), spout-MSGID, + msgeditor.getMessageId(s1));* } catch (Exception e) { System.out.println(exeception is here); e.printStackTrace(); throw e; } System.out.println(msgid,+msgId); return msgeditor.addMessageId(v1,msgId); } }); -- Thanks Regards, Anshu Shukla On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla anshushuk...@gmail.com wrote: Can not we write some data to a txt file in parallel with multiple executors running in parallel ?? -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Using queueStream
JavaDStreamString inputStream = ssc.queueStream(rddQueue); Can this rddQueue be of dynamic type in nature .If yes then how to make it run untill rddQueue is not finished . Any other way to get rddQueue from a dynamically updatable Normal Queue . -- Thanks Regards, SERC-IISC Anshu Shukla
Implementing and Using a Custom Actor-based Receiver
Is there any good sample code in java to implement *Implementing and Using a Custom Actor-based Receiver .* -- Thanks Regards, Anshu Shukla
Problem: Custom Receiver for getting events from a Dynamic Queue
I have written a custom receiver for converting the tuples in the Dynamic Queue/EventGen to the Dstream.But i dont know why It is only processing data for some time (3-4 sec.) only and then shows Queue as Empty .ANy suggestions please .. --code // public class JavaCustomReceiver extends ReceiverString implements ISyntheticEventGen { EventGen eventGen; BlockingQueueListString eventQueue; String csvFileName; String outSpoutCSVLogFileName; double scalingFactor; public JavaCustomReceiver(String csvFileName, String outSpoutCSVLogFileName, double scalingFactor) { super(StorageLevel.MEMORY_AND_DISK()); this.csvFileName = csvFileName; this.outSpoutCSVLogFileName = outSpoutCSVLogFileName; this.scalingFactor = scalingFactor; this.eventGen = new EventGen(this,this.scalingFactor); this.eventGen.launch(this.csvFileName, this.outSpoutCSVLogFileName); //Launch threads this.eventQueue = new LinkedBlockingQueueListString(); System.out.println(for watching queue); } public void onStart() { // Start the thread that receives data over a connection new Thread() { @Override public void run() { receive(); } }.start(); } public void onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private void receive() { try { // connect to the server //socket = new Socket(host, port); //BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // Until stopped or connection broken continue reading while (!isStopped() ) { ListString entry = this.eventQueue.take(); String str=; for(String s:entry) str+=s+,; System.out.println(Received data ' + str + '); store(str); } // Restart in an attempt to connect again when server is active again restart(Trying to connect again); } catch(Throwable t) { // restart if there is any other error restart(Error receiving data, t); } } @Override public StorageLevel storageLevel() { return StorageLevel.MEMORY_AND_DISK(); } @Override public void receive(ListString event) { // TODO Auto-generated method stub //System.out.println(Called IN SPOUT### ); try { this.eventQueue.put(event); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } -- Thanks Regards, Anshu Shukla
Union of two Diff. types of DStreams
How to take union of JavaPairDStreamString, Integer and JavaDStreamString . *a.union(b) is working only with Dstreams of same type.* -- Thanks Regards, Anshu Shukla
Re: Applying functions over certain count of tuples .
Thanks for that , One more doubt is that How to perform different logic/operations over a Dstreams with two types of Streamid in it . *Its like using stream-id in storm and Fork type of think (diff logic for both edges at same type without using 2 separate filters .).* On Tue, Jun 30, 2015 at 1:38 AM, Richard Marscher rmarsc...@localytics.com wrote: Hi, not sure what the context is but I think you can do something similar with mapPartitions: rdd.mapPartitions { iterator = iterator.grouped(5).map { tupleGroup = emitOneRddForGroup(tupleGroup) } } The edge case is when the final grouping doesn't have exactly 5 items, if that matters. On Mon, Jun 29, 2015 at 3:57 PM, anshu shukla anshushuk...@gmail.com wrote: I want to apply some logic on the basis of a FIX count of number of tuples in each RDD . *suppose emit one rdd for every 5 tuple of previous RDD . * -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Applying functions over certain count of tuples .
I want to apply some logic on the basis of a FIX count of number of tuples in each RDD . *suppose emit one rdd for every 5 tuple of previous RDD . * -- Thanks Regards, Anshu Shukla
Re: Graceful shutdown for Spark Streaming
Yes I was doing same , if You mean that this is the correct way to do Then I will verify it once more in my case . On Thu, Jul 30, 2015 at 1:02 PM, Tathagata Das t...@databricks.com wrote: How is sleep not working? Are you doing streamingContext.start() Thread.sleep(xxx) streamingContext.stop() On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla anshushuk...@gmail.com wrote: If we want to stop the application after fix-time period , how it will work . (How to give the duration in logic , in my case sleep(t.s.) is not working .) So i used to kill coarseGrained job at each slave by script .Please suggest something . On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com wrote: StreamingContext.stop(stopGracefully = true) stops the streaming context gracefully. Then you can safely terminate the Spark cluster. They are two different steps and needs to be done separately ensuring that the driver process has been completely terminated before the Spark cluster is the terminated. On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com wrote: How to initiate graceful shutdown from outside of the Spark Streaming driver process? Both for the local and cluster mode of Spark Standalone as well as EMR. Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there a signal sent to the driver process? For EMR, is there a way how to terminate an EMR cluster with Spark Streaming graceful shutdown? Thanks! -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Graceful shutdown for Spark Streaming
If we want to stop the application after fix-time period , how it will work . (How to give the duration in logic , in my case sleep(t.s.) is not working .) So i used to kill coarseGrained job at each slave by script .Please suggest something . On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das t...@databricks.com wrote: StreamingContext.stop(stopGracefully = true) stops the streaming context gracefully. Then you can safely terminate the Spark cluster. They are two different steps and needs to be done separately ensuring that the driver process has been completely terminated before the Spark cluster is the terminated. On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia mici...@gmail.com wrote: How to initiate graceful shutdown from outside of the Spark Streaming driver process? Both for the local and cluster mode of Spark Standalone as well as EMR. Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there a signal sent to the driver process? For EMR, is there a way how to terminate an EMR cluster with Spark Streaming graceful shutdown? Thanks! -- Thanks Regards, Anshu Shukla
Parallelism of Custom receiver in spark
1 - How to increase the level of *parallelism in spark streaming custom RECEIVER* . 2 - Will ssc.receiverstream(/**anything //) will *delete the data stored in spark memory using store(s) * logic . -- Thanks Regards, Anshu Shukla
ReceiverStream SPARK not able to cope up with 20,000 events /sec .
My eventGen is emitting 20,000 events/sec ,and I am using store(s1) in receive() method to push data to receiverStream . But this logic is working fine for upto 4000 events/sec and no batch are seen emitting for larger rate . *CODE:TOPOLOGY -* *JavaDStreamString sourcestream = ssc.receiverStream(new TetcCustomEventReceiver(datafilename,spoutlog,argumentClass.getScalingFactor(),datasetType));* *CODE:TetcCustomEventReceiver -* public void receive(ListString event) { StringBuffer tuple=new StringBuffer(); msgId++; for(String s:event) { tuple.append(s).append(,); } String s1=MsgIdAddandRemove.addMessageId(tuple.toString(),msgId); store(s1); } -- Thanks Regards, Anshu Shukla
Re: Ordering of Batches in Spark streaming
Anyone who can give some highlight over HOW SPARK DOES *ORDERING OF BATCHES * . On Sat, Jul 11, 2015 at 9:19 AM, anshu shukla anshushuk...@gmail.com wrote: Thanks Ayan , I was curious to know* how Spark does it *.Is there any *Documentation* where i can get the detail about that . Will you please point me out some detailed link etc . May be it does something like *transactional topologies in storm*.( https://storm.apache.org/documentation/Transactional-topologies.html) On Sat, Jul 11, 2015 at 9:13 AM, ayan guha guha.a...@gmail.com wrote: AFAIK, it is guranteed that batch t+1 will not start processing until batch t is done. ordeing within batch - what do you mean by that? In essence, the (mini) batch will get distributed in partitions like a normal RDD, so following rdd.zipWithIndex should give a wy to order them by the time they are received. On Sat, Jul 11, 2015 at 12:50 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , Is there any *guarantee of fix ordering among the batches/RDDs* . After searching a lot I found there is no ordering by default (from the framework itself ) not only on *batch wise *but *also ordering within batches* .But i doubt is there any change from old spark versions to spark 1.4 in this context. Any Comments please !! -- Thanks Regards, Anshu Shukla -- Best Regards, Ayan Guha -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
REST api to avoid spark context creation
I have a web based appllication for analytics over the data stored in Hbase .Every time User can query about any fix time duration data.But the response time to that query is about ~ 40 sec.On every request most of time is wasted in Context creation and Job submission . 1-How can i avoid context creation for every Job. 2-Can i have something like pool to serve requests . -- Thanks & Regards, Anshu Shukla
Ordering of Batches in Spark streaming
Hey , Is there any *guarantee of fix ordering among the batches/RDDs* . After searching a lot I found there is no ordering by default (from the framework itself ) not only on *batch wise *but *also ordering within batches* .But i doubt is there any change from old spark versions to spark 1.4 in this context. Any Comments please !! -- Thanks Regards, Anshu Shukla
Re: Ordering of Batches in Spark streaming
Thanks Ayan , I was curious to know* how Spark does it *.Is there any *Documentation* where i can get the detail about that . Will you please point me out some detailed link etc . May be it does something like *transactional topologies in storm*.( https://storm.apache.org/documentation/Transactional-topologies.html) On Sat, Jul 11, 2015 at 9:13 AM, ayan guha guha.a...@gmail.com wrote: AFAIK, it is guranteed that batch t+1 will not start processing until batch t is done. ordeing within batch - what do you mean by that? In essence, the (mini) batch will get distributed in partitions like a normal RDD, so following rdd.zipWithIndex should give a wy to order them by the time they are received. On Sat, Jul 11, 2015 at 12:50 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , Is there any *guarantee of fix ordering among the batches/RDDs* . After searching a lot I found there is no ordering by default (from the framework itself ) not only on *batch wise *but *also ordering within batches* .But i doubt is there any change from old spark versions to spark 1.4 in this context. Any Comments please !! -- Thanks Regards, Anshu Shukla -- Best Regards, Ayan Guha -- Thanks Regards, Anshu Shukla
Error while taking union
Hi all , I want to create union of 2 DStreams , in one of them *RDD is created per 1 second* , other is having RDD generated by reduceByWindowandKey with *duration set to 60 sec.* (slide duration also 60 sec .) - Main idea is to do some analysis for every minute data and emitting union of input data (per sec.) and transformed data (per min.) .
Setting number of CORES from inside the Topology (JAVA code )
Hey , I need to set the number of cores from inside the topology . Its working fine by setting in spark-env.sh but unable to do via setting key/value for conf . SparkConf sparkConf = new SparkConf().setAppName(JavaCustomReceiver).setMaster(local[4]); if(toponame.equals(IdentityTopology)) { sparkConf.setExecutorEnv(SPARK_WORKER_CORES,1); } -- Thanks Regards, Anshu Shukla
Resource allocation in SPARK streaming
I am not much clear about resource allocation (CPU/CORE/Thread level allocation) as per the parallelism by setting number of cores in spark standalone mode . Any guidelines for that . -- Thanks & Regards, Anshu Shukla
Corelation between 2 consecutive RDDs in Dstream
1- Is there any wat=y to either make the pair of RDDs from a Dstream- Dstream ---> Dstream<RDD1,RDD2> so that i can use already defined corelation function in spark. *Aim is to find auto-corelation value in spark .(As per my knowledge spark streaming does not support this.)* -- Thanks & Regards, Anshu Shukla
Moving avg in saprk streaming
Any formal way to do moving avg over fixed window duration . I calculated a simple moving average by creating a count stream and a sum stream; then joined them and finally calculated the mean. This was not per time window since time periods were part of the tuples. -- Thanks & Regards, Anshu Shukla