Hi,
1、NPE问题,https://github.com/apache/flink/blob/7a490d85ab4113f859e5ca8e2cd163439452c221/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/WrappingCollector.java#L43
在这里collect的时候DEBUG发现this.collector为null, https://imgchr.com/i/D0Hm5Q 这里我打了断点;
“第一次调用正常” 是指第一次进断点collector不是null https://imgchr.com/i/D0HuCj 
(这是第一次进断点),第二次进断点就变成null了 https://imgchr.com/i/D0Hm5Q(这是第二次进断点)


2、udtf代码:
@FunctionHint(output = @DataTypeHint("ROW<high52 DOUBLE, low52 DOUBLE>"))
public class HighLow52Func extends TableFunction<Row> {
    public void eval(Double high, Double low, @DataTypeHint("DECIMAL(18,4)") 
BigDecimal highPrice52, @DataTypeHint("DECIMAL(18,4)") BigDecimal lowPrice52){
        double resultHigh52 = highPrice52 == null ? high : Math.max(high, 
highPrice52.doubleValue());
        double resultLow52 = lowPrice52 == null ? low : Math.min(low, 
lowPrice52.doubleValue());
        Row row = Row.of(resultHigh52, resultLow52);
        collect(row);
    }
}


3、flink版本是1.11.2
4、执行的sql就是Table table = bsTableEnv.sqlQuery("SELECT 
HighLow52Func(a.High,a.Low,b.HighPrice52,b.LowPrice52) from market_data a left 
join budget_data b on a.Code = b.Security_Code");


PS: 刚接触flink sql,udtf这块有点想当然了没仔细看文档。又看了下官网的示例,应该是我的使用姿势错了,按照示例改了下SQL,问题已解决,感谢解答~














在 2020-11-26 19:26:47,"hailongwang" <18868816...@163.com> 写道:
>Hi,
>   +I 表示是一条 insert 的数据,其它类型的可以查看:
>  
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java
>
>
>   其中 NPE 问题,你是用哪个版本的呢? 方便复制下 sql 和 udtf 吗,我看下能不能复现。
>  PS:“第一次调用正常” 是指?
>
>
>Best,
>Hailong
>
>在 2020-11-26 17:25:03,"bulterman" <15618338...@163.com> 写道:
>>Hi all,
>>我在使用udtf 调用collect方法的时候出现空指针异常,显示collector对象为null,第一次调用正常,第二次调用数据被标记了 
>>“INSERT”是什么意思?
>>https://imgchr.com/i/D0HuCj
>>https://imgchr.com/i/D0Hm5Q
>>udtf代码:
>>https://imgchr.com/i/D0HeUg

回复