Re: mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Akisaya
thank you @chesnay

I tried in vain to find the issue about introduction of  new watermark
strategy, can you provide some details about it ?

Chesnay Schepler  于2021年1月11日周一 下午9:43写道:

> The idleTimeout you found is from an earlier attempt at implementing
> idleness, but making it configurable was aborted midway through as there
> were some API issues. The effort was subsumed by a new source interface and
> watermark generators that were introduced in 1.12.
>
> Some more details can be found in FLINK-5018
> <https://issues.apache.org/jira/browse/FLINK-5018>.
>
> On 1/11/2021 12:40 PM, Akisaya wrote:
>
> Hey there,
> recently I have to join two streams while one of it may be idle for a long
> time, in flink 1.12, the Watermark Generator has a method `withIdleness`
> to detect if a stream is idle or not so that the operator can still advance
> its watermark by another active stream, and the state of this operator will
> continuously grow up.
>
> But in flink 1.10, there's no such withIdleness method
> flink 1.10 docs mention a workaround in
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> ,but this doesn't work well.
>
> After walking through the code,I found StreamSourceContexts#getSourceContext
> provides a param idleness which is hard coded to -1 in StreamSource#run.
>
> StreamSourceContexts#getSourceContext
>
> public static  SourceFunction.SourceContext getSourceContext(
>   TimeCharacteristic timeCharacteristic,  ProcessingTimeService 
> processingTimeService,  Object checkpointLock,  
> StreamStatusMaintainer streamStatusMaintainer,  Output> 
> output,  long watermarkInterval,  long idleTimeout) {
>
>
>
> StreamSource#run
>
> this.ctx = StreamSourceContexts.getSourceContext(
>timeCharacteristic,   getProcessingTimeService(),   lockingObject,   
> streamStatusMaintainer,   collector,   watermarkInterval,   -1);
>
>
> After extending a flink KafkaConnector and setting idleness using
> reflection, I found it works as I expected!
>
>
>
> I'm very curious that why flink does not provide this param to user to
> determine if a stream is idle and what will be the side effect.
>
> thx.
>
>
>
>
>
>


mark kafka stream as idle if no data comes in for a while in flink 1.10

2021-01-11 Thread Akisaya
Hey there,
recently I have to join two streams while one of it may be idle for a long
time, in flink 1.12, the Watermark Generator has a method `withIdleness` to
detect if a stream is idle or not so that the operator can still advance
its watermark by another active stream, and the state of this operator will
continuously grow up.

But in flink 1.10, there's no such withIdleness method
flink 1.10 docs mention a workaround in
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
,but this doesn't work well.

After walking through the code,I found StreamSourceContexts#getSourceContext
provides a param idleness which is hard coded to -1 in StreamSource#run.

StreamSourceContexts#getSourceContext

public static  SourceFunction.SourceContext getSourceContext(
  TimeCharacteristic timeCharacteristic,
  ProcessingTimeService processingTimeService,
  Object checkpointLock,
  StreamStatusMaintainer streamStatusMaintainer,
  Output> output,
  long watermarkInterval,
  long idleTimeout) {



StreamSource#run

this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);


After extending a flink KafkaConnector and setting idleness using
reflection, I found it works as I expected!



I'm very curious that why flink does not provide this param to user to
determine if a stream is idle and what will be the side effect.

thx.


Re: flink 1.10 将流设置为 idle

2021-01-11 Thread Akisaya
图片好像发送不出去,这里贴一下代码



StreamSourceContexts#getSourceContext

public static  SourceFunction.SourceContext getSourceContext(
  TimeCharacteristic timeCharacteristic,
  ProcessingTimeService processingTimeService,
  Object checkpointLock,
  StreamStatusMaintainer streamStatusMaintainer,
  Output> output,
  long watermarkInterval,
  long idleTimeout) {




StreamSource#run

this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);


Akisaya  于2021年1月11日周一 下午7:17写道:

> flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为
> idle 流
> 但是在 1.10 中没有这样的方法可以设置。
>
> 看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在
> StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了
> [image: image.png]
>
> StreamSource
> [image: image.png]
>
> 请问下,为啥不考虑将这个参数开放出来供用户使用。
>
> 我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka
> 流是否有数据并设置其为 idle
>
>


flink 1.10 将流设置为 idle

2021-01-11 Thread Akisaya
flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为
idle 流
但是在 1.10 中没有这样的方法可以设置。

看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在
StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了
[image: image.png]

StreamSource
[image: image.png]

请问下,为啥不考虑将这个参数开放出来供用户使用。

我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka
流是否有数据并设置其为 idle


Re: 根据业务需求选择合适的flink state

2020-12-28 Thread Akisaya
这个可以用 session window 吧
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

> 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。
>
>
>
> news_...@163.com
>
> 发件人: 张锴
> 发送时间: 2020-12-28 13:35
> 收件人: user-zh
> 主题: 根据业务需求选择合适的flink state
> 各位大佬帮我分析下如下需求应该怎么写
>
> 需求说明:
> 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
>
> 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
>
> 我的想法:
> 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
> 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
>
> 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
>
> flink 版本1.10.1
>


Re: Native Kubernetes 需要访问HDFS

2020-12-21 Thread Akisaya
1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置

Yang Wang  于2020年12月19日周六 上午12:18写道:

> 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
> hadoop的配置并且挂载给JobManager和TaskManager的
>
> Best,
> Yang
>
> liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:
>
> > Hi:
> >  在使用Native Kubernetes
> > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
> >  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 
>