Some more info

val lines = ssc.socketStream() // works
val lines = ssc.receiverStream(new NiFiReceiver(conf, 
StorageLevel.MEMORY_AND_DISK_SER_2)) // does not work

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780

On 15/09/2017 21:50, Margus Roo wrote:

Hi

I tested |spark.streaming.receiver.maxRate and ||spark.streaming.backpressure.enabled settings using socketStream and it works.|

|But if I am using nifi-spark-receiver (https://mvnrepository.com/artifact/org.apache.nifi/nifi-spark-receiver) then it does not using |
||spark.streaming.receiver.maxRate
||

||any workaround?
||

||

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780
On 14/09/2017 09:57, Margus Roo wrote:

Hi

Using Spark 2.1.1.2.6-1.0-129 (from Hortonworks distro) and Scala 2.11.8 and Java 1.8.0_60

I have Nifi flow produces more records than Spark stream can work in batch time. To avoid spark queue overflow I wanted to try spark streaming backpressure (did not work for my) so back to the more simple but static solution I tried spark.streaming.receiver.maxRate.

I set it spark.streaming.receiver.maxRate=1. As I understand it from Spark manual: "If the batch processing time is more than batchinterval then obviously the receiver’s memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. Using SparkConf configuration|spark.streaming.receiver.maxRate|, rate of receiver can be limited." - it means 1 record per second?

I have very simple code:

val conf =new 
SiteToSiteClient.Builder().url("http://192.168.80.120:9090/nifi";).portName("testing").buildConfig()
val ssc =new StreamingContext(sc, Seconds(1))

val lines = ssc.receiverStream(new NiFiReceiver(conf, 
StorageLevel.MEMORY_AND_DISK))
lines.print()

ssc.start()


I have loads of records waiting in Nifi testing port. After I start ssc.start() I will get 9-7 records per batch (batch time is 1s). Do I understand spark.streaming.receiver.maxRate wrong?

--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780


Reply via email to