从语义上说,已经有产生 Watermark 的逻辑了,如果 forward 此前的 watermark
在其他一些用户场景下或许也不合适。从另一个角度考虑你也可以把 watermark 带在 element
上,实现 AssignerWithPunctuatedWatermarks 的 Watermark
checkAndGetNextWatermark(T lastElement, long extractedTimestamp); 方法时从
element 取出来

Best,
tison.


tison <wander4...@gmail.com> 于2020年4月16日周四 下午10:36写道:

> 喔,看差了...Flink 原生没有这样的支持,不过可以扩展基础的算子来实现你要的逻辑
>
> 参考 assignTimestampsAndWatermarks
> 的实现,以及 TimestampsAndPunctuatedWatermarksOperator,不要重写 processWatermark
> 方法,应该可以实现。DataStream 方面调用更基础的 transform 方法
>
> 如果你觉得这是一个合理的需求,应该开箱即用,也可以到 Flink 的 JIRA(issue)列表上提
> https://jira.apache.org/jira/projects/FLINK/issues 注册 id 后创建 JIRA 单即可
>
> Best,
> tison.
>
>
> taowang <taow...@deepglint.com> 于2020年4月16日周四 下午10:12写道:
>
>> 感谢回复,但是很抱歉我试了一下发现不可以。
>> 无论是使用了`AssignerWithPeriodicWatermarks`的`getCurrentWatermark`还是`AssignerWithPunctuatedWatermarks`的`checkAndGetNextWatermark`,当它们`return
>> null`时下游算子拿到的水印都显示为`No
>> Watermark`,我在下游算子中打印出`context.currentWatermark()`发现都是`Long.MIN_VALUE`的值。
>> 看了这两个接口文档,不太理解这里的`no new watermark will be
>> generated.`是什么意思,一般理解为不会生成新的水印那之前的水印就应该被继续传下去吧,但实际看来是下游接收不到水印(`no
>> watermark`?)。
>> @tison,你那边是尝试过这种写法吗,如果可以的话,能麻烦给一个简单的demo吗。
>>
>>
>> 感谢帮助!🙏🙏🙏
>> ```
>> public interface AssignerWithPeriodicWatermarks<T> extends
>> TimestampAssigner<T> {
>>
>>  /**
>>  * Returns the current watermark. This method is periodically called by
>> the
>>  * system to retrieve the current watermark. The method may return {@code
>> null} to
>>  * indicate that no new Watermark is available.
>>  *
>>  * <p>The returned watermark will be emitted only if it is non-null and
>> its timestamp
>>  * is larger than that of the previously emitted watermark (to preserve
>> the contract of
>>  * ascending watermarks). If the current watermark is still
>>  * identical to the previous one, no progress in event time has happened
>> since
>>  * the previous call to this method. If a null value is returned, or the
>> timestamp
>>  * of the returned watermark is smaller than that of the last emitted
>> one, then no
>>  * new watermark will be generated.
>>  *
>>  * <p>The interval in which this method is called and Watermarks are
>> generated
>>  * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
>>  *
>>  * @see org.apache.flink.streaming.api.watermark.Watermark
>>  * @see ExecutionConfig#getAutoWatermarkInterval()
>>  *
>>  * @return {@code Null}, if no watermark should be emitted, or the next
>> watermark to emit.
>>  */
>>  @Nullable
>>  Watermark getCurrentWatermark();
>> }
>> ```
>>
>>
>>  原始邮件
>> 发件人: tison<wander4...@gmail.com>
>> 收件人: user-zh<user-zh@flink.apache.org>
>> 发送时间: 2020年4月16日(周四) 20:33
>> 主题: Re: 为消息分配时间戳但不想重新分配水印
>>
>>
>> 在 getCurrentWatermark 里返回 null 就行了,会 forward 此前的 watermark 的。另外语义上使用
>> AssignerWithPunctuatedWatermarks 会更合适一点。 Best, tison. taowang <
>> taow...@deepglint.com> 于2020年4月16日周四 下午5:13写道: > Hello,大家好: > 在flink >
>> stream中我这里遇到一个需求是,想到对上一个算子输出的消息重新分配时间戳,但此时我不想重新分配水印。在从kafka读取消息时我已经添加了水印。 >
>> 为了实现这个功能,我想有两种方法: > 1. 在算子输出后面重新为消息分配水印:看到flink >
>> stream暂时只有`assignTimestampsAndWatermarks`方法,这里面要实现两个接口:`getCurrentWatermark`和`extractTimestamp`。我只想实现`extractTimestamp`而不想管水印相关的`getCurrentWatermark
>> > `这个方法。因为在加水印前多并行度会造成乱序从而使水印增长过快。 > 2. >
>> 在上一个算子输出时指定这个消息的时间戳,但我只看到在`SourceFunction`里才有`collectWithTimestamp`之类的方法,在正常的比如`ProcessFunction`里是只有`collect`方法。
>> > > > 我现在只能使用`assignTimestampsAndWatermarks` >
>> 去重新分配水印,但这限制了我前面的算子并行度都必须设置为1,请问大家有什么好的办法吗? > 感谢解答!
>
>

回复