大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:


不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233<wangwangdaxian...@gmail.com> 写道:
Hi,

累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。

Best,
shizk233

罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:


大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到,,,,这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
api,希望看到的大佬能帮我解惑一下,谢谢啦

| |
罗显宴
|
|
邮箱:15927482...@163.com
|

签名由 网易邮箱大师 定制
package com.bupt.main

import java.sql.Timestamp
import java.util
import java.util.Properties

import org.apache.flink.api.common.functions.{AggregateFunction, 
FlatMapFunction, MapFunction, RuntimeContext}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, 
MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import 
org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests



case class Drug(id:String,ATCCode:String,MIMSType:String,
                name:String,other:String, producer:String,
                retailPrice:String,composition:String,
                medicineRank:String, timestamp:Long)

case class IncreaseNumPerHour(category:String, num:Long,timestamp:Long)

case class ItemViewCount(category:String,windowEnd:Long)

case class IncreasePerHour(time:String,num:Long)

object KafkaToElasticSearch {
  def main(args: Array[String]): Unit = {
    // 1. 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val props = new Properties
    props.put("bootstrap.servers", "master:9092")
    props.put("zookeeper.connect", "master:2181")
    props.put("group.id", "drug-group")
    props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "earliest")

    val drugs = env.addSource(new FlinkKafkaConsumer011[String]("drugs", 
//这个 kafka topic 需要和上面的工具类的 topic 一致
      new SimpleStringSchema, props)).setParallelism(1)
      .filter(string =>{
        val words = string.split(",")
        words.length == 10 && words(2).length!=0
      })
      .map(new MapFunction[String,Drug] {
      override def map(value: String): Drug = {
          val words = value.split(",")
        if(words.length!=10)
        println(words)
          
Drug(words(0),words(1),words(2),words(3),words(4),words(5),words(6),words(7),words(8),words(9).trim.toLong)
      }
    } ).assignAscendingTimestamps( _.timestamp )
//    drugs.print()
    val num = drugs.map(drug => {
      var temp: StringBuilder = new StringBuilder(drug.MIMSType)

      if (temp.length != 0 && temp.charAt(0) == '-')
        temp.deleteCharAt(0)
      if (temp.length != 0 && temp.charAt(temp.length - 1) == ';')
        temp.deleteCharAt(temp.length - 1)
      var validateResult: String = null
      if (temp.length != 0) validateResult = temp.substring(0, temp.indexOf(' 
'))
      IncreaseNumPerHour(validateResult, 1l, drug.timestamp)
    })

//    num.print()
    //
    val result = num.keyBy(_.category)
      .timeWindow(Time.hours(1))
      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(20)))
      .aggregate(new CountAgg,new WindowResult)
      .keyBy(_.windowEnd)
      .process(new IncreaseItems)
    result.print()

    env.execute("compute num")
  }

}

// 自定义预聚合函数

//in、acc、out
//在这里面,累加器相当于是state
class CountAgg() extends AggregateFunction[IncreaseNumPerHour, Long, Long]{
  //提供输入,改变状态
  override def add(value: IncreaseNumPerHour, accumulator: Long): Long =  1l

  //创建初始状态
  override def createAccumulator(): Long = 1L

  //提供状态,输出
  override def getResult(accumulator: Long): Long = accumulator

  //合并状态
  override def merge(a: Long, b: Long): Long = 1l
}

// 自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long, ItemViewCount, String, 
TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], 
out: Collector[ItemViewCount]): Unit = {
    out.collect( ItemViewCount(key, window.getEnd) )
  }
}


// 自定义的处理函数
class IncreaseItems() extends KeyedProcessFunction[Long, ItemViewCount, 
IncreasePerHour]{

  private var itemState: MapState[ItemViewCount,Int] = _

  override def open(parameters: Configuration): Unit = {
    itemState = getRuntimeContext.getMapState(new 
MapStateDescriptor[ItemViewCount,Int]("item-state",TypeInformation.of(classOf[ItemViewCount]),TypeInformation.of(classOf[Int])))
    //itemState = getRuntimeContext.getListState( new 
ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]) )
  }

  override def processElement(value: ItemViewCount, ctx: 
KeyedProcessFunction[Long, ItemViewCount, IncreasePerHour]#Context, out: 
Collector[IncreasePerHour]): Unit = {
    // 把每条数据存入状态列表
    itemState.put(value,1)
    // 注册一个定时器
    ctx.timerService().registerEventTimeTimer( value.windowEnd + 1 )
  }

  // 定时器触发时,对所有数据排序,并输出结果
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, 
ItemViewCount, IncreasePerHour]#OnTimerContext, out: 
Collector[IncreasePerHour]): Unit = {
    //计数
    var i: Long = 0l
    import scala.collection.JavaConversions._
    for( item <- itemState.keys()){
      i+=1
    }

    Thread.sleep(1000)
    out.collect(IncreasePerHour(new Timestamp( timestamp - 1 ).toString,i))
  }
}


回复