Re: flink 1.10 将流设置为 idle
图片好像发送不出去,这里贴一下代码 StreamSourceContexts#getSourceContext public static SourceFunction.SourceContext getSourceContext( TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object checkpointLock, StreamStatusMaintainer streamStatusMaintainer, Output> output, long watermarkInterval, long idleTimeout) { StreamSource#run this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); Akisaya 于2021年1月11日周一 下午7:17写道: > flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为 > idle 流 > 但是在 1.10 中没有这样的方法可以设置。 > > 看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在 > StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了 > [image: image.png] > > StreamSource > [image: image.png] > > 请问下,为啥不考虑将这个参数开放出来供用户使用。 > > 我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka > 流是否有数据并设置其为 idle > >
flink 1.10 将流设置为 idle
flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为 idle 流 但是在 1.10 中没有这样的方法可以设置。 看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在 StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了 [image: image.png] StreamSource [image: image.png] 请问下,为啥不考虑将这个参数开放出来供用户使用。 我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka 流是否有数据并设置其为 idle
Re: 根据业务需求选择合适的flink state
这个可以用 session window 吧 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com 于2020年12月28日周一 下午2:15写道: > 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 > > > > news_...@163.com > > 发件人: 张锴 > 发送时间: 2020-12-28 13:35 > 收件人: user-zh > 主题: 根据业务需求选择合适的flink state > 各位大佬帮我分析下如下需求应该怎么写 > > 需求说明: > 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A > > 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 > > 我的想法: > 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 > 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 > > 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 > > flink 版本1.10.1 >
Re: Native Kubernetes 需要访问HDFS
1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置 Yang Wang 于2020年12月19日周六 上午12:18写道: > 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship > hadoop的配置并且挂载给JobManager和TaskManager的 > > Best, > Yang > > liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道: > > > Hi: > > 在使用Native Kubernetes > > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录 > > 但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 >