Re: flink 1.10 将流设置为 idle

2021-01-11 文章 Akisaya
图片好像发送不出去,这里贴一下代码



StreamSourceContexts#getSourceContext

public static  SourceFunction.SourceContext getSourceContext(
  TimeCharacteristic timeCharacteristic,
  ProcessingTimeService processingTimeService,
  Object checkpointLock,
  StreamStatusMaintainer streamStatusMaintainer,
  Output> output,
  long watermarkInterval,
  long idleTimeout) {




StreamSource#run

this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);


Akisaya  于2021年1月11日周一 下午7:17写道:

> flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为
> idle 流
> 但是在 1.10 中没有这样的方法可以设置。
>
> 看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在
> StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了
> [image: image.png]
>
> StreamSource
> [image: image.png]
>
> 请问下,为啥不考虑将这个参数开放出来供用户使用。
>
> 我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka
> 流是否有数据并设置其为 idle
>
>


flink 1.10 将流设置为 idle

2021-01-11 文章 Akisaya
flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为
idle 流
但是在 1.10 中没有这样的方法可以设置。

看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在
StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了
[image: image.png]

StreamSource
[image: image.png]

请问下,为啥不考虑将这个参数开放出来供用户使用。

我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka
流是否有数据并设置其为 idle


Re: 根据业务需求选择合适的flink state

2020-12-28 文章 Akisaya
这个可以用 session window 吧
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_...@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>


Re: Native Kubernetes 需要访问HDFS

2020-12-21 文章 Akisaya
1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置

Yang Wang  于2020年12月19日周六 上午12:18写道:

> 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
> hadoop的配置并且挂载给JobManager和TaskManager的
>
> Best,
> Yang
>
> liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:
>
> > Hi:
> >  在使用Native Kubernetes
> > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
> >  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 
>