Hi,
I have run a program to monitor the sum of the delay in every minutes of a
stream,this is my code:
.map(new RichMapFunction[String,(Long,Int)] {
override def map(in: String): (Long,Int) = {
var str:String = ""
try {
val arr = in.split("\\|")
((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
}catch {
case e:Exception =>{
System.out.println("data has been dropped" + str)
null
}
}
}
}).slotSharingGroup("kafkaSource").setParallelism(200)
.filter(item =>item !=null && item._1
>=0).slotSharingGroup("kafkaSource").setParallelism(200)
signalSource.keyBy(f=>f._1 )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.reduce { (e1,e2)
=>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
.addSink(new
OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")
but there is a problem,when the data is not delaying,the key of 1,2,3,4,5
> have so much data that the backPressure is always 1,has any way to avoid
> this condition?
please give me some advice!thank you so much.