Re: 1.17.1 - Interval join的时候发生NPE
Hi Hangxiang, 感谢您的回应。 下面是该问题的关键代码,main_stream表是流数据源,数据事件流频约每笔500ms~1s,目前尝试将t1minStream和t5minStream assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())是不会产生这问题造成作业失败了,但输出会有数据丢失。 如有其他思路,麻烦你了。 String t1minSql = "SELECT rowtime, key, id, AVG(num) OVER w_t1min AS avg_t1min FROM main_stream WINDOW w_t1min AS (PARTITON BY key ORDER BY rowtime RANGE BETWEEN INTERVAL ‘1’ MINUTES PRECEDING AND CURRENT ROW)"; Table t1minTable = tableEnv.sqlQuery(t1minSql); String t5minSql = "SELECT rowtime, key, id, AVG(num) OVER w_t5min AS avg_t5min FROM main_stream WINDOW w_t5min AS (PARTITON BY key ORDER BY rowtime RANGE BETWEEN INTERVAL ‘5’ MINUTES PRECEDING AND CURRENT ROW)"; Table t5minTable = tableEnv.sqlQuery(t5minSql); DataStream t1minStream = tableEnv.toChangelogStream(t1minTable); DataStream t5minStream = tableEnv.toChangelogStream(t5minTable); DataStream joinedStream = t1minStream.keyBy(new TupleKeySelector("key", "id")).intervalJoin(t5minStream.keyBy(new TupleKeySelector("key", "id"))).inEventTime().between(Time.milliseconds(-1000L), Time.milliseconds(1000L)).process(new ProcessJoinFunction() { @Override public void processElement(Row left, Row right, ProcessJoinFunction.Context ctx, Collector collector) throws Exception { collector.collect(Row.join(left, right)); } }); > Hangxiang Yu 於 2023年9月25日 上午10:54 寫道: > > Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗 > > On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang wrote: > >> Hi, >> >> 单机本地开发执行,遇到该问题,请问有人遇过并解决吗? >> >> 2023-09-23 13:52:03.989 INFO >> [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval >> Join (19/20) >> (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0) >> switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @ >> localhost (dataPort=-1). >> java.lang.NullPointerException: null >>at >> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69) >> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268) >> ~[flink-streaming-java-1.17.1.jar:1.17.1] >>at >> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) >>
Re: Flink SQL的状态清理
Hi, 可以通过设置 table.exec.state.ttl 来控制状态算子的 state TTL. 更多信息请参阅 [1] [1] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/concepts/overview/#%e7%8a%b6%e6%80%81%e7%ae%a1%e7%90%86 Best, Jane On Thu, Sep 21, 2023 at 5:17 PM faronzz wrote: > 试试这个 t_env.get_config().set("table.exec.state.ttl", "86400 s") > > > > > | | > faronzz > | > | > faro...@163.com > | > > > 回复的原邮件 > | 发件人 | 小昌同学 | > | 发送日期 | 2023年09月21日 17:06 | > | 收件人 | user-zh | > | 主题 | Flink SQL的状态清理 | > > > 各位老师好,请教一下大家关于flink sql的状态清理问题,我百度的话只找到相关的minbath设置,sql是没有配置state的ttl设置嘛 > | | > 小昌同学 > | > | > ccc0606fight...@163.com > |
Re: 退订
Hi, 请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅来自 user-zh@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。 Please send email to user-zh-unsubscr...@flink.apache.org if you want to unsubscribe the mail from user-zh@flink.apache.org , and you can refer [1][2] for more details. Best, Yunfeng On Mon, Sep 25, 2023 at 10:43 AM 星海 <2278179...@qq.com.invalid> wrote: > > 退订
Re: 1.17.1 - Interval join的时候发生NPE
Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗 On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang wrote: > Hi, > > 单机本地开发执行,遇到该问题,请问有人遇过并解决吗? > > 2023-09-23 13:52:03.989 INFO > [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval > Join (19/20) > (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0) > switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @ > localhost (dataPort=-1). > java.lang.NullPointerException: null > at > org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69) > ~[flink-statebackend-changelog-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:115) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) > ~[flink-streaming-java-1.17.1.jar:1.17.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) >
在使用使用jemalloc内存分配器一段时间后,出现checkpoint 超时,任务卡住的情况
在使用使用jemalloc内存分配器一段时间后,出现checkpoint 超时,任务卡住的情况,哪位遇到过呢?flink版本:flink-1.13.2,jiemalloc版本:5.3.0
After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck
After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck. Who has encountered this? flink version:1.13.2, jiemalloc version: 5.3.0
flink两阶段提交
请教一下,flink的两阶段提交对于sink算子,预提交是在做检查点的哪个阶段触发的?预提交时具体是做了什么工作?