Re: spark.streaming.receiver.maxRate
Some more info val lines = ssc.socketStream() // works val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780 On 15/09/2017 21:50, Margus Roo wrote: Hi I tested |spark.streaming.receiver.maxRate and ||spark.streaming.backpressure.enabled settings using socketStream and it works.| |But if I am using nifi-spark-receiver (https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) then it does not using | ||spark.streaming.receiver.maxRate || ||any workaround? || || Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780 On 14/09/2017 09:57, Margus Roo wrote: Hi Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60 I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate. I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration|spark.streaming.receiver.maxRate|, rate of receiver can be limited." - it means 1 record per second? I have very simple code: val conf =new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi;).portName("testing").buildConfig() val ssc =new StreamingContext(sc, Seconds(1)) val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK)) lines.print() ssc.start() I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong? -- Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780
Re: spark.streaming.receiver.maxRate
Hi I tested |spark.streaming.receiver.maxRate and ||spark.streaming.backpressure.enabled settings using socketStream and it works.| |But if I am using nifi-spark-receiver (https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) then it does not using | ||spark.streaming.receiver.maxRate || ||any workaround? || || Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780 On 14/09/2017 09:57, Margus Roo wrote: Hi Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60 I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate. I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration|spark.streaming.receiver.maxRate|, rate of receiver can be limited." - it means 1 record per second? I have very simple code: val conf =new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi;).portName("testing").buildConfig() val ssc =new StreamingContext(sc, Seconds(1)) val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK)) lines.print() ssc.start() I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong? -- Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780
spark.streaming.receiver.maxRate
Hi Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60 I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate. I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration|spark.streaming.receiver.maxRate|, rate of receiver can be limited." - it means 1 record per second? I have very simple code: val conf =new SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi;).portName("testing").buildConfig() val ssc =new StreamingContext(sc, Seconds(1)) val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_AND_DISK)) lines.print() ssc.start() I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong? -- Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780
How to update python code in memory
Hi In example I submited python code to cluster: in/spark-submit --master spark://nn1:7077 SocketListen.py Now I discovered that I have to change something in SocketListen.py. One way is stop older work and submit new one. Is there way to change code in workers machines so that there no need to submit new code? -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming linear regression example question
Tnx for the workaround. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 16/03/15 06:20, Jeremy Freeman wrote: Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming linear regression example question
Hi again Tried the same examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala from 1.3.0 and getting in case testing file content is: (0.0,[3.0,4.0,3.0]) (0.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[5.0,6.0,6.0]) (6.0,[7.0,4.0,7.0]) (7.0,[8.0,6.0,8.0]) (8.0,[44.0,9.0,9.0]) (9.0,[14.0,30.0,10.0]) and the answer: (0.0,0.0) (0.0,0.0) (0.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) What is wrong? I can see that model's weights are changing in case I put new data into training dir. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 14/03/15 09:05, Margus Roo wrote: Hi I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression Code: import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream object StreamingLinReg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(StreamLinReg).setMaster(local[2]) val ssc = new StreamingContext(conf, Seconds(10)) val trainingData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } Compiled code and run it Put file contains (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) in to training directory. I can see that models weight change: 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333,7.333,7.333] No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) And answer will be (1.0,0.0) (2.0,0.0) (3.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) And in case my file content is (0.0,[2.0,2.0,2.0]) (0.0,[3.0,3.0,3.0]) (0.0,[4.0,4.0,4.0]) (0.0,[5.0,5.0,5.0]) (0.0,[6.0,6.0,6.0]) (0.0,[7.0,7.0,7.0]) (0.0,[8.0,8.0,8.0]) (0.0,[9.0,9.0,9.0]) (0.0,[10.0,10.0,10.0]) the answer will be: (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) I except to get label predicted by model. -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480
Streaming linear regression example question
Hi I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression Code: import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream object StreamingLinReg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(StreamLinReg).setMaster(local[2]) val ssc = new StreamingContext(conf, Seconds(10)) val trainingData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } Compiled code and run it Put file contains (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) in to training directory. I can see that models weight change: 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333,7.333,7.333] No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) And answer will be (1.0,0.0) (2.0,0.0) (3.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) And in case my file content is (0.0,[2.0,2.0,2.0]) (0.0,[3.0,3.0,3.0]) (0.0,[4.0,4.0,4.0]) (0.0,[5.0,5.0,5.0]) (0.0,[6.0,6.0,6.0]) (0.0,[7.0,7.0,7.0]) (0.0,[8.0,8.0,8.0]) (0.0,[9.0,9.0,9.0]) (0.0,[10.0,10.0,10.0]) the answer will be: (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) I except to get label predicted by model. -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480