Re: spark.streaming.receiver.maxRate

2017-09-15 Thread Margus Roo

Some more info

val lines = ssc.socketStream() // works
val lines = ssc.receiverStream(new NiFiReceiver(conf, 
StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780

On 15/09/2017 21:50, Margus Roo wrote:


Hi

I tested |spark.streaming.receiver.maxRate and 
||spark.streaming.backpressure.enabled settings using socketStream and 
it works.|


|But if I am using nifi-spark-receiver 
(https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) 
then it does not using |

||spark.streaming.receiver.maxRate
||

||any workaround?
||

||

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 14/09/2017 09:57, Margus Roo wrote:


Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 
2.11.8 and Java 1.8.0_60


I have Nifi flow produces more records than Spark stream can work in 
batch time. To avoid spark queue overflow I wanted to try spark 
streaming backpressure (did not work for my) so back to the more 
simple but static solution I tried spark.streaming.receiver.maxRate.


I set it spark.streaming.receiver.maxRate=1. As I understand it from 
Spark manual: "If the batch processing time is more than 
batchinterval then obviously the receiver’s memory will start filling 
up and will end up in throwing exceptions (most probably 
BlockNotFoundException). Currently there is no way to pause the 
receiver. Using SparkConf 
configuration|spark.streaming.receiver.maxRate|, rate of receiver can 
be limited." - it means 1 record per second?


I have very simple code:

val conf =new 
SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi;).portName("testing").buildConfig()
val ssc =new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, 
StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start 
ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I 
understand spark.streaming.receiver.maxRate wrong?


--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780






Re: spark.streaming.receiver.maxRate

2017-09-15 Thread Margus Roo

Hi

I tested |spark.streaming.receiver.maxRate and 
||spark.streaming.backpressure.enabled settings using socketStream and 
it works.|


|But if I am using nifi-spark-receiver 
(https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) 
then it does not using |

||spark.streaming.receiver.maxRate
||

||any workaround?
||

||

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780

On 14/09/2017 09:57, Margus Roo wrote:


Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 
2.11.8 and Java 1.8.0_60


I have Nifi flow produces more records than Spark stream can work in 
batch time. To avoid spark queue overflow I wanted to try spark 
streaming backpressure (did not work for my) so back to the more 
simple but static solution I tried spark.streaming.receiver.maxRate.


I set it spark.streaming.receiver.maxRate=1. As I understand it from 
Spark manual: "If the batch processing time is more than batchinterval 
then obviously the receiver’s memory will start filling up and will 
end up in throwing exceptions (most probably BlockNotFoundException). 
Currently there is no way to pause the receiver. Using SparkConf 
configuration|spark.streaming.receiver.maxRate|, rate of receiver can 
be limited." - it means 1 record per second?


I have very simple code:

val conf =new 
SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi;).portName("testing").buildConfig()
val ssc =new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, 
StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start 
ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I 
understand spark.streaming.receiver.maxRate wrong?


--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780




spark.streaming.receiver.maxRate

2017-09-14 Thread Margus Roo

Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 
and Java 1.8.0_60


I have Nifi flow produces more records than Spark stream can work in 
batch time. To avoid spark queue overflow I wanted to try spark 
streaming backpressure (did not work for my) so back to the more simple 
but static solution I tried spark.streaming.receiver.maxRate.


I set it spark.streaming.receiver.maxRate=1. As I understand it from 
Spark manual: "If the batch processing time is more than batchinterval 
then obviously the receiver’s memory will start filling up and will end 
up in throwing exceptions (most probably BlockNotFoundException). 
Currently there is no way to pause the receiver. Using SparkConf 
configuration|spark.streaming.receiver.maxRate|, rate of receiver can be 
limited." - it means 1 record per second?


I have very simple code:

val conf =new 
SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi;).portName("testing").buildConfig()
val ssc =new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, 
StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start 
ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I 
understand spark.streaming.receiver.maxRate wrong?


--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780



How to update python code in memory

2015-09-16 Thread Margus Roo

Hi

In example I submited python code to cluster:
in/spark-submit --master spark://nn1:7077 SocketListen.py
Now I discovered that I have to change something in SocketListen.py.
One way is stop older work and submit new one.
Is there way to change code in workers machines so that there no need to 
submit new code?


--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming linear regression example question

2015-03-16 Thread Margus Roo

Tnx for the workaround.

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 16/03/15 06:20, Jeremy Freeman wrote:
Hi Margus, thanks for reporting this, I’ve been able to reproduce and 
there does indeed appear to be a bug. I’ve created a JIRA and have a 
fix ready, can hopefully include in 1.3.1.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming linear regression example question

2015-03-15 Thread Margus Roo

Hi again

Tried the same 
examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala 
from 1.3.0

and getting in case testing file content is:
  (0.0,[3.0,4.0,3.0])
  (0.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[5.0,6.0,6.0])
  (6.0,[7.0,4.0,7.0])
  (7.0,[8.0,6.0,8.0])
  (8.0,[44.0,9.0,9.0])
  (9.0,[14.0,30.0,10.0])

and the answer:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)

What is wrong?
I can see that model's weights are changing in case I put new data into 
training dir.


Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 14/03/15 09:05, Margus Roo wrote:

Hi

I try to understand example provided in 
https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - 
Streaming linear regression


Code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream

object StreamingLinReg {

  def main(args: Array[String]) {

val conf = new 
SparkConf().setAppName(StreamLinReg).setMaster(local[2])

val ssc = new StreamingContext(conf, Seconds(10))


val trainingData = 
ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache()


val testData = 
ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse)


val numFeatures = 3
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))


model.trainOn(trainingData)
model.predictOnValues(testData.map(lp = (lp.label, 
lp.features))).print()


ssc.start()
ssc.awaitTermination()

  }

}

Compiled code and run it
Put file contains
  (1.0,[2.0,2.0,2.0])
  (2.0,[3.0,3.0,3.0])
  (3.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[6.0,6.0,6.0])
  (6.0,[7.0,7.0,7.0])
  (7.0,[8.0,8.0,8.0])
  (8.0,[9.0,9.0,9.0])
  (9.0,[10.0,10.0,10.0])
in to training directory.

I can see that models weight change:
15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current 
model: weights, [7.333,7.333,7.333]


No I can put what ever in to testing directory but I can not 
understand answer.
In example I can put the same file I used for training in to testing 
directory. File content is

  (1.0,[2.0,2.0,2.0])
  (2.0,[3.0,3.0,3.0])
  (3.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[6.0,6.0,6.0])
  (6.0,[7.0,7.0,7.0])
  (7.0,[8.0,8.0,8.0])
  (8.0,[9.0,9.0,9.0])
  (9.0,[10.0,10.0,10.0])

And answer will be
(1.0,0.0)
(2.0,0.0)
(3.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)

And in case my file content is
  (0.0,[2.0,2.0,2.0])
  (0.0,[3.0,3.0,3.0])
  (0.0,[4.0,4.0,4.0])
  (0.0,[5.0,5.0,5.0])
  (0.0,[6.0,6.0,6.0])
  (0.0,[7.0,7.0,7.0])
  (0.0,[8.0,8.0,8.0])
  (0.0,[9.0,9.0,9.0])
  (0.0,[10.0,10.0,10.0])

the answer will be:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)

I except to get label predicted by model.
--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480




Streaming linear regression example question

2015-03-14 Thread Margus Roo

Hi

I try to understand example provided in 
https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - 
Streaming linear regression


Code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream

object StreamingLinReg {

  def main(args: Array[String]) {

val conf = new 
SparkConf().setAppName(StreamLinReg).setMaster(local[2])

val ssc = new StreamingContext(conf, Seconds(10))


val trainingData = 
ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache()


val testData = 
ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse)


val numFeatures = 3
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))


model.trainOn(trainingData)
model.predictOnValues(testData.map(lp = (lp.label, 
lp.features))).print()


ssc.start()
ssc.awaitTermination()

  }

}

Compiled code and run it
Put file contains
  (1.0,[2.0,2.0,2.0])
  (2.0,[3.0,3.0,3.0])
  (3.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[6.0,6.0,6.0])
  (6.0,[7.0,7.0,7.0])
  (7.0,[8.0,8.0,8.0])
  (8.0,[9.0,9.0,9.0])
  (9.0,[10.0,10.0,10.0])
in to training directory.

I can see that models weight change:
15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: 
weights, [7.333,7.333,7.333]


No I can put what ever in to testing directory but I can not understand 
answer.
In example I can put the same file I used for training in to testing 
directory. File content is

  (1.0,[2.0,2.0,2.0])
  (2.0,[3.0,3.0,3.0])
  (3.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[6.0,6.0,6.0])
  (6.0,[7.0,7.0,7.0])
  (7.0,[8.0,8.0,8.0])
  (8.0,[9.0,9.0,9.0])
  (9.0,[10.0,10.0,10.0])

And answer will be
(1.0,0.0)
(2.0,0.0)
(3.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)

And in case my file content is
  (0.0,[2.0,2.0,2.0])
  (0.0,[3.0,3.0,3.0])
  (0.0,[4.0,4.0,4.0])
  (0.0,[5.0,5.0,5.0])
  (0.0,[6.0,6.0,6.0])
  (0.0,[7.0,7.0,7.0])
  (0.0,[8.0,8.0,8.0])
  (0.0,[9.0,9.0,9.0])
  (0.0,[10.0,10.0,10.0])

the answer will be:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)

I except to get label predicted by model.

--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480