[ https://issues.apache.org/jira/browse/FLINK-19655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216395#comment-17216395 ]
Leonard Xu commented on FLINK-19655: ------------------------------------ [~seunjjs] I reviewed the PR, you can have a look. ^_^ > NPE when using blink planner and TemporalTableFunction after setting > IdleStateRetentionTime > -------------------------------------------------------------------------------------------- > > Key: FLINK-19655 > URL: https://issues.apache.org/jira/browse/FLINK-19655 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.10.0 > Reporter: seunjjs > Assignee: seunjjs > Priority: Major > Labels: pull-request-available > > My Code here: > {code:java} > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > bsSettings); > tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), > Time.seconds(600)); > final Table table = tableEnv.from("tableName"); > final TableFunction<?> function = table.createTemporalTableFunction( > temporalTableEntry.getTimeAttribute(), > String.join(",", > temporalTableEntry.getPrimaryKeyFields())); > tableEnv.registerFunction(temporalTableEntry.getName(), function); > {code} > And NPE throwed when I executed my program. > {code:java} > java.lang.NullPointerException > at > org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109) > at > org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > {code} > > And When I changed to useOldPlanner, it worked fine.And when I debuged the > code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be > executed. > Here is BaseTwoInputStreamOperatorWithStateRetention#open code. > {code:java} > public void open() throws Exception { > initializeTimerService(); > if (stateCleaningEnabled) { > ValueStateDescriptor<Long> cleanupStateDescriptor = > new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, > Types.LONG); > latestRegisteredCleanupTimer = > getRuntimeContext().getState(cleanupStateDescriptor); > } > } > {code} > Here is TemporalProcessTimeJoinOperator#open code. > {code:java} > public void open() throws Exception { > this.joinCondition = > generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); > FunctionUtils.setFunctionRuntimeContext(joinCondition, > getRuntimeContext()); > FunctionUtils.openFunction(joinCondition, new Configuration()); > ValueStateDescriptor<BaseRow> rightStateDesc = new > ValueStateDescriptor<>("right", rightType); > this.rightState = getRuntimeContext().getState(rightStateDesc); > this.collector = new TimestampedCollector<>(output); > this.outRow = new JoinedRow(); > // consider watermark from left stream only. > super.processWatermark2(Watermark.MAX_WATERMARK); > } > {code} > I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be > TemporalProcessTimeJoinOperator#open should add super.open()? > Here is TemporalProcessTimeJoin#open code. > {code:scala} > override def open(): Unit = { > LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n > Code:\n$genJoinFuncCode") > val clazz = compile( > getRuntimeContext.getUserCodeClassLoader, > genJoinFuncName, > genJoinFuncCode) > LOG.debug("Instantiating FlatJoinFunction.") > joinFunction = clazz.newInstance() > FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext) > FunctionUtils.openFunction(joinFunction, new Configuration()) > val rightStateDescriptor = new ValueStateDescriptor[Row]("right", > rightType) > rightState = getRuntimeContext.getState(rightStateDescriptor) > collector = new TimestampedCollector[CRow](output) > cRowWrapper = new CRowWrappingCollector() > cRowWrapper.out = collector > super.open() > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)