关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-07 文章
原先sql任务是:
CREATE TABLE A_source(...)
CREATE TABLE B_sink (...)
INSERT INTO B_sink
SELECT
 1
FROM 
A_source
;
我基于这个FlinkSQL任务生成了savepoint后,我重新修改为


CREATE TABLE A_source(...)
CREATE TABLE B_sink (...)
CREATE TABLE C_source(...)
CREATE TABLE D_sink (...)
INSERT INTO B_sink
SELECT
 1
FROM 
A_source
;


INSERT INTO C_sink
SELECT
 1
FROM 
D_source
;
并基于Savepoint提交,结果显示

Cannot map checkpoint/savepoint state for operator 
2e9c6b0c053878cef673bbe7d94ab037 to the new program, because the operator is 
not available in the new program. 
If you want to allow to skip this, you can set the --allowNonRestoredState 
option on the CLI.


想请教一下底层是因为什么原因导致了opertor匹配不上?

clean package maven dependency

2019-12-30 文章
HI ALL
  i want to package  project ,but when mvn clean package 
executed,then throw exception  
which repository should i use?
>>  
[ERROR] Failed to execute goal on project flink-dist_2.11: Could not resolve 
dependencies for project org.apache.flink:flink-dist_2.11:jar:1.8-SNAPSHOT: The 
following artifacts could not be resolved: 
org.apache.flink:flink-examples-streaming-state-machine_2.11:jar:1.8-SNAPSHOT, 
org.apache.flink:flink-examples-streaming-twitter_2.11:jar:1.8-SNAPSHOT, 
org.apache.flink:flink-ml-uber_2.11:jar:1.8-SNAPSHOT: Failure to find 
org.apache.flink:flink-examples-streaming-state-machine_2.11:jar:1.8-SNAPSHOT 
in http://10.139.32.180:8080/nexus/content/groups/public was cached in the 
local repository, resolution will not be reattempted until the update interval 
of nexus has elapsed or updates are forced -> [Help 1]





想问一下Flink SQL 1.9版本支持维表join吗

2019-12-04 文章
想问一下,目前flink sql 1.9 可以通过sql的方式 join 关联外部数据源吗?
比如说通过sql DDL 定义一个Hbase的维表数据源,然后用sql join关联上。


CREATETABLEMyUserTable(
hbase_rowkey_namerowkey_type,
hbase_column_family_name1ROW<...>,
hbase_column_family_name2ROW<...>
)WITH(
'connector.type'='hbase',...
)


SELECT
tb1.xx
MyUserTable.xx
FROM
tb1
JOIN
MyUserTable ON
tb1.xx = MyUserTable.xx

基于savepoint 调小并发的问题

2019-10-09 文章
由于资源问题,想对已运行的任务执行生成savepoint并,基于改savepoint重启并调小并发,但是查询了文档并没有看到相关调小并发的描述,所以想问一下这样是否可行?

Re:consumer并发与kafka并发不一致的情况

2019-09-23 文章
HI
1. 当kafka consumer的并发大于kafka partition的情况,多余的并发会怎么样?->这句话是想问当flink 
slot数大于kafka partition的时候多的slot(其实就是在slot上创建的kafka consumer)会空跑。
如图:kafka partiton为10个,但是flink slot为15,则有5个节点是空跑的状态。






2. 当kafka consumer并发等于kafka partition个数,但是kafka 
partition个别分区没有数据的情况,这个空的partition的consumer线程会怎么样?对barrier和watermark的生成有什么影响?

线程和barrier什么情况不是很清楚,watermarker的话他应该会判断当前通道的状态,空跑节点的通道状态应该会被置为IDLE,从而不会被纳入到从各通道中取最小的watermarker去发往下游算子。






在 2019-09-24 10:06:04,"gaofeilong198...@163.com"  写道:
>1. 当kafka consumer的并发大于kafka partition的情况,多余的并发会怎么样?
>2. 当kafka consumer并发等于kafka partition个数,但是kafka 
>partition个别分区没有数据的情况,这个空的partition的consumer线程会怎么样?对barrier和watermark的生成有什么影响?
>
>
>
>gaofeilong198...@163.com


如何统计数据处理延迟Delay情况

2019-09-05 文章
HI ALL
  目前想对Flink Job添加一个统计数据处理延迟情况的Metric,目前的想法是拿到数据携带的时间(Event 
Time)于当前节点的时间(System.getCurrentTime)相减,得出的值即数据延迟处理的时间,但不确定这个想法是否正确且可行,求各位大佬提供思路和想法~

Re:Re: 如何优化flink内存?

2019-09-05 文章
HI
  我在项目中有遇到过类似的情况,我说下我的想法和思路。
  伊始是需要统计90天事件窗口中用户浏览事件总数,如果是在近30天内有浏览事件则累加1次,在30天内没有浏览事件但在 30天 ~ 
90天内有其他浏览事件则记0次(需求比较奇葩),我们使用了滑动窗口(长度90天 步长1天 
数据进来实时trigger触发计算)因为需要拿到窗口的结束时间所以一开始是用windowProcessFunction去做的聚合统计,这意味着90个窗口每个窗口里都需要缓存着全部的数据而不是一个聚合汇总数据,在线上跑了两天后发现checkpoint
 size已经陡增到20个G并且不久就OOM了。后面想了一下,Flink 
提供的SlideWindow的算法不是闭包可以直接复用,用flatmap对每条数据使用slideWindow得出这条数据对应的90天的窗口结束时间,然后在keyby后使用ProcessFunction,在里面自定义valueState对数据进行聚合汇总,并且在processFunction内部还可以访问TimeService,可以注册清理过期state数据的Timer,并在onTimer回调方法中清理状态。
 以上是我的思路,希望能帮助到你~




祝好





在 2019-09-05 13:43:00,"Yifei Qi"  写道:
>你的意思是自己去实现滑动窗口的功能么?
>
>戴嘉诚  于2019年9月4日周三 下午10:51写道:
>
>> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>>
>> Yifei Qi 于2019年9月4日 周三20:07写道:
>>
>> > 大家好:
>> >
>> >
>> >
>> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>> >
>> >
>> >
>> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>> >
>> >
>> >
>> > 具体情况是这样的:
>> >
>> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>> >
>> > 按照用户进行分组.
>> >
>> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>> >
>> >
>> >
>> >
>> >
>> > flink运行在3个节点后, 内存合计就用了5G.
>> >
>> >
>> >
>> >
>> >
>> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>> >
>> >
>> >
>> >
>> >
>> > 顺祝商祺
>> >
>> >
>> > --
>> >
>> >
>> > Qi Yifei
>> > [image: https://]about.me/qyf404
>> > <
>> >
>> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
>> > >
>> >
>>
>
>
>-- 
>
>
>Qi Yifei
>[image: https://]about.me/qyf404
>


Re:FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

2019-09-01 文章




2019-09-02 10:24:28,599 INFO  org.apache.flink.runtime.taskmanager.Task 
- Interval Join -> Sink: Unnamed (1/4) 
(e8b85b6f144879efbb0b4209f226c69b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 6ms.



在 2019-09-02 11:29:35,"陈赋赟"  写道:

我在flink中使用了kafkaProducer 
并开启了ExcatlyOnce语义,第一次部署在测试环境启动的时候一切正常,然后我再上新版本的时候kill掉了之前的任务,并重现发布了一下,就出现了如下的问题日志里显示在做checkpoint的时候出现了初始化事务状态
 超时过期的异常。
具体异常如下:
 


checkpoint interval设置了30s执行一次
producer事务超时(transaction.timeout.ms)时间设置了5分钟




 

FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

2019-09-01 文章
我在flink中使用了kafkaProducer 
并开启了ExcatlyOnce语义,第一次部署在测试环境启动的时候一切正常,然后我再上新版本的时候kill掉了之前的任务,并重现发布了一下,就出现了如下的问题日志里显示在做checkpoint的时候出现了初始化事务状态
 超时过期的异常。
具体异常如下:
 


checkpoint interval设置了30s执行一次
producer事务超时(transaction.timeout.ms)时间设置了5分钟

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 文章
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}




 





 





 

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 文章
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}




 





 

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 文章
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}




 

关于定期清理state,但是checkpoint size大小一直在增加的疑问

2019-08-29 文章
各位好,本人在使用Flink 
processFunction来管理state数据的时候产生了一些疑问想要请教各位,我在流里使用flatmap为数据分配了多个endTime,然后根据数据的key+endTime去做keyby,在processFunction
 open()里我初始化了value 
state,并在processElement()里注册了清理state的Timer,在ontimer()里清除state,按照我的理解,如果state状态定时清理掉的话
 那checkpoint size会一直保持在一个稳定的大小,但是流现在已经跑了8天,我看checkpoint 
size的大小在不断增长,现在已经增长到了1.6个G。所以有些怀疑是不是管理state的代码逻辑写的有问题,请各位指教一下。 代码如下


job以及processFunction代码
val stream = env.addSource(consumer).uid("DianDianUserAppViewCount_Source")
.map(rawEvent => {
try {
val eventJson = parse(rawEvent)

if ((eventJson \ "type").extract[String] == "track" &&
(eventJson \ "project_id").extract[Int] == 6 &&
(eventJson \ "properties" \ "$is_login_id").extract[Boolean] &&
(eventJson \ "time").extract[Long] >= startFromTimestamp.toLong)
Some(eventJson)
else
None
  } catch {
case _ => None
  }
}).uid("DianDianUserAppViewCount_Map")
.filter(item => item.isDefined).uid("DianDianUserAppViewCount_Filter")
.flatMap{
  item =>
val timestamp = (item.get \ "time").extract[Long]
SlidingEventTimeWindows
.of(Time.days(90), Time.days(1), Time.hours(-8))
.assignWindows(null, timestamp, null)
.map{
case timeWindow =>
RichAppViewEvent(
s"${(item.get \ "distinct_id").extract[String]}_${timeWindow.getEnd}",
(item.get \ "distinct_id").extract[String],
(item.get \ "event").extract[String],
timestamp,
timeWindow.getEnd
)
}
}.uid("DianDianUserAppViewCount_FlatMap")
.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[RichAppViewEvent](Time.hours(1)) {
override def extractTimestamp(element: RichAppViewEvent): Long = element.time
}).uid("DianDianUserAppViewCount_Watermarker")
.keyBy(_.key)
.process(new ProcessAppViewCount).uid("DianDianUserAppViewCount_Process")
.writeUsingOutputFormat(new 
HBaseOutputFormat).uid("DianDianUserAppViewCount_Sink")


class ProcessAppViewCount extends KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)] {

// 30天毫秒值  1000 * 60 * 60 * 24 * 30
private val thirtyDaysForTimestamp = 259200L
private val dateFormat = new SimpleDateFormat("MMdd")

private lazy val appViewCount: ValueState[Int] = getRuntimeContext.getState(new 
ValueStateDescriptor[Int]("AppViewCountState", classOf[Int]))

override def processElement(value: RichAppViewEvent,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#Context,
out: Collector[(String, String, Int)]): Unit = {

if (!isLate(value, ctx)){
// 根据窗口结束时间往前推30天
val beforeThirtyDaysStartTime = value.windowEndTime - thirtyDaysForTimestamp

// 如果数据是窗口结束事件最近30天内并且有$AppViewScreen事件则累加
var currentValue = appViewCount.value()

if (value.time >= beforeThirtyDaysStartTime &&
  value.event.equals("$AppViewScreen")){
currentValue = currentValue + 1
appViewCount.update(currentValue)
  }

// 发送到下游
out.collect(
(value.distinctId, dateFormat.format(value.windowEndTime - 1), 
currentValue)
  )

// 设置cleanup timer
registerCleanupTimer(value, ctx)
}

  }

override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, RichAppViewEvent, (String, String, 
Int)]#OnTimerContext,
out: Collector[(String, String, Int)]): Unit = {

if (appViewCount != null){
appViewCount.clear()
}

  }

/**
* 如果当前watermarker时间已经超过了该数据分配的窗口结束时间的话,
* 则认为是迟到数据不进行处理。
* @param value
* @param ctx
* @return
*/
private def isLate(value: RichAppViewEvent, ctx: KeyedProcessFunction[String, 
RichAppViewEvent, (String, String, Int)]#Context): Boolean = {
ctx.timerService().currentWatermark() >= value.windowEndTime
  }

/**
* 以窗口结束时间做为清理定时器的时间戳
* @param value
* @param ctx
*/
private def registerCleanupTimer(value: RichAppViewEvent, ctx: 
KeyedProcessFunction[String, RichAppViewEvent, (String, String, Int)]#Context) 
= {
val cleanupTime = value.windowEndTime
ctx.timerService().registerEventTimeTimer(cleanupTime)
  }
}