Re: 1.17.1 - Interval join的时候发生NPE

2023-09-24 文章 Phoes Huang
Hi Hangxiang,

感谢您的回应。
下面是该问题的关键代码,main_stream表是流数据源,数据事件流频约每笔500ms~1s,目前尝试将t1minStream和t5minStream 
assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())是不会产生这问题造成作业失败了,但输出会有数据丢失。
如有其他思路,麻烦你了。

String t1minSql = "SELECT rowtime, key, id, AVG(num) OVER w_t1min AS avg_t1min 
FROM main_stream WINDOW w_t1min AS (PARTITON BY key ORDER BY rowtime RANGE 
BETWEEN INTERVAL ‘1’ MINUTES PRECEDING AND CURRENT ROW)";

Table t1minTable = tableEnv.sqlQuery(t1minSql);

String t5minSql = "SELECT rowtime, key, id, AVG(num) OVER w_t5min AS 
avg_t5min FROM main_stream WINDOW w_t5min AS (PARTITON BY key ORDER BY rowtime 
RANGE BETWEEN INTERVAL ‘5’ MINUTES PRECEDING AND CURRENT ROW)";

Table t5minTable = tableEnv.sqlQuery(t5minSql);

DataStream t1minStream = tableEnv.toChangelogStream(t1minTable);

DataStream t5minStream = tableEnv.toChangelogStream(t5minTable);

DataStream joinedStream = t1minStream.keyBy(new 
TupleKeySelector("key", "id")).intervalJoin(t5minStream.keyBy(new 
TupleKeySelector("key", 
"id"))).inEventTime().between(Time.milliseconds(-1000L), 
Time.milliseconds(1000L)).process(new ProcessJoinFunction() {
@Override
public void processElement(Row left, Row right, 
ProcessJoinFunction.Context ctx, Collector collector) 
throws Exception {
collector.collect(Row.join(left, right));
}
});



> Hangxiang Yu  於 2023年9月25日 上午10:54 寫道:
> 
> Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗
> 
> On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang  wrote:
> 
>> Hi,
>> 
>> 单机本地开发执行,遇到该问题,请问有人遇过并解决吗?
>> 
>> 2023-09-23 13:52:03.989 INFO
>> [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval
>> Join (19/20)
>> (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0)
>> switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @
>> localhost (dataPort=-1).
>> java.lang.NullPointerException: null
>>at
>> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69)
>> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at 
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268)
>> ~[flink-streaming-java-1.17.1.jar:1.17.1]
>>at
>> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
>> 

Re: 1.17.1 - Interval join的时候发生NPE

2023-09-24 文章 Hangxiang Yu
Hi, 请问下是 SQL 作业还是 DataStream 作业,可以提供一些可复现的关键 SQL 或代码吗

On Sat, Sep 23, 2023 at 3:59 PM Phoes Huang  wrote:

> Hi,
>
> 单机本地开发执行,遇到该问题,请问有人遇过并解决吗?
>
> 2023-09-23 13:52:03.989 INFO
> [flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval
> Join (19/20)
> (ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0)
> switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @
> localhost (dataPort=-1).
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69)
> ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:115)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-streaming-java-1.17.1.jar:1.17.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> 

1.17.1 - Interval join的时候发生NPE

2023-09-23 文章 Phoes Huang
Hi,

单机本地开发执行,遇到该问题,请问有人遇过并解决吗?

2023-09-23 13:52:03.989 INFO 
[flink-akka.actor.default-dispatcher-9][Execution.java:1445] - Interval Join 
(19/20) 
(ff8e25fb94208d3c27f549a1e24757ea_e8388ada9c03cfdb1446bb3ccfbd461b_18_0) 
switched from RUNNING to FAILED on d569c5db-6882-496b-9e92-8a40bb631784 @ 
localhost (dataPort=-1).
java.lang.NullPointerException: null
at 
org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:149)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.TimerSerializer.serialize(TimerSerializer.java:39)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.lambda$logRemoval$1(ChangelogKeyGroupedPriorityQueue.java:153)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.AbstractStateChangeLogger.lambda$serialize$4(AbstractStateChangeLogger.java:184)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.AbstractStateChangeLogger.serializeRaw(AbstractStateChangeLogger.java:193)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.AbstractStateChangeLogger.serialize(AbstractStateChangeLogger.java:178)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.AbstractStateChangeLogger.log(AbstractStateChangeLogger.java:151)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.AbstractStateChangeLogger.valueElementRemoved(AbstractStateChangeLogger.java:125)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.logRemoval(ChangelogKeyGroupedPriorityQueue.java:153)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue.poll(ChangelogKeyGroupedPriorityQueue.java:69)
 ~[flink-statebackend-changelog-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:609)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:618)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamTwoInputProcessorFactory.java:268)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:115)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-streaming-java-1.17.1.jar:1.17.1]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-runtime-1.17.1.jar:1.17.1]
at