大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道: 不好意思,刚才发的快,没来得及解释, 这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
| | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道: 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站, | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年7月20日 11:47,shizk233<wangwangdaxian...@gmail.com> 写道: Hi, 累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。 Best, shizk233 罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道: 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream api,希望看到的大佬能帮我解惑一下,谢谢啦 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制
package com.bupt.main import java.sql.Timestamp import java.util import java.util.Properties import org.apache.flink.api.common.functions.{AggregateFunction, FlatMapFunction, MapFunction, RuntimeContext} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.flink.util.Collector import org.apache.http.HttpHost import org.elasticsearch.client.Requests case class Drug(id:String,ATCCode:String,MIMSType:String, name:String,other:String, producer:String, retailPrice:String,composition:String, medicineRank:String, timestamp:Long) case class IncreaseNumPerHour(category:String, num:Long,timestamp:Long) case class ItemViewCount(category:String,windowEnd:Long) case class IncreasePerHour(time:String,num:Long) object KafkaToElasticSearch { def main(args: Array[String]): Unit = { // 1. å建æ§è¡ç¯å¢ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val props = new Properties props.put("bootstrap.servers", "master:9092") props.put("zookeeper.connect", "master:2181") props.put("group.id", "drug-group") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("auto.offset.reset", "earliest") val drugs = env.addSource(new FlinkKafkaConsumer011[String]("drugs", //è¿ä¸ª kafka topic éè¦åä¸é¢çå·¥å ·ç±»ç topic ä¸è´ new SimpleStringSchema, props)).setParallelism(1) .filter(string =>{ val words = string.split(",") words.length == 10 && words(2).length!=0 }) .map(new MapFunction[String,Drug] { override def map(value: String): Drug = { val words = value.split(",") if(words.length!=10) println(words) Drug(words(0),words(1),words(2),words(3),words(4),words(5),words(6),words(7),words(8),words(9).trim.toLong) } } ).assignAscendingTimestamps( _.timestamp ) // drugs.print() val num = drugs.map(drug => { var temp: StringBuilder = new StringBuilder(drug.MIMSType) if (temp.length != 0 && temp.charAt(0) == '-') temp.deleteCharAt(0) if (temp.length != 0 && temp.charAt(temp.length - 1) == ';') temp.deleteCharAt(temp.length - 1) var validateResult: String = null if (temp.length != 0) validateResult = temp.substring(0, temp.indexOf(' ')) IncreaseNumPerHour(validateResult, 1l, drug.timestamp) }) // num.print() // val result = num.keyBy(_.category) .timeWindow(Time.hours(1)) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20))) .aggregate(new CountAgg,new WindowResult) .keyBy(_.windowEnd) .process(new IncreaseItems) result.print() env.execute("compute num") } } // èªå®ä¹é¢èåå½æ° //inãaccãout //å¨è¿éé¢ï¼ç´¯å å¨ç¸å½äºæ¯state class CountAgg() extends AggregateFunction[IncreaseNumPerHour, Long, Long]{ //æä¾è¾å ¥ï¼æ¹åç¶æ override def add(value: IncreaseNumPerHour, accumulator: Long): Long = 1l //å建åå§ç¶æ override def createAccumulator(): Long = 1L //æä¾ç¶æï¼è¾åº override def getResult(accumulator: Long): Long = accumulator //å并ç¶æ override def merge(a: Long, b: Long): Long = 1l } // èªå®ä¹çªå£å½æ°ï¼è¾åºItemViewCount class WindowResult() extends WindowFunction[Long, ItemViewCount, String, TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { out.collect( ItemViewCount(key, window.getEnd) ) } } // èªå®ä¹çå¤çå½æ° class IncreaseItems() extends KeyedProcessFunction[Long, ItemViewCount, IncreasePerHour]{ private var itemState: MapState[ItemViewCount,Int] = _ override def open(parameters: Configuration): Unit = { itemState = getRuntimeContext.getMapState(new MapStateDescriptor[ItemViewCount,Int]("item-state",TypeInformation.of(classOf[ItemViewCount]),TypeInformation.of(classOf[Int]))) //itemState = getRuntimeContext.getListState( new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]) ) } override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, IncreasePerHour]#Context, out: Collector[IncreasePerHour]): Unit = { // ææ¯æ¡æ°æ®åå ¥ç¶æå表 itemState.put(value,1) // 注åä¸ä¸ªå®æ¶å¨ ctx.timerService().registerEventTimeTimer( value.windowEnd + 1 ) } // å®æ¶å¨è§¦åæ¶ï¼å¯¹æææ°æ®æåºï¼å¹¶è¾åºç»æ override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, IncreasePerHour]#OnTimerContext, out: Collector[IncreasePerHour]): Unit = { //è®¡æ° var i: Long = 0l import scala.collection.JavaConversions._ for( item <- itemState.keys()){ i+=1 } Thread.sleep(1000) out.collect(IncreasePerHour(new Timestamp( timestamp - 1 ).toString,i)) } }