Hi, 1、NPE问题,https://github.com/apache/flink/blob/7a490d85ab4113f859e5ca8e2cd163439452c221/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/WrappingCollector.java#L43 在这里collect的时候DEBUG发现this.collector为null, https://imgchr.com/i/D0Hm5Q 这里我打了断点; “第一次调用正常” 是指第一次进断点collector不是null https://imgchr.com/i/D0HuCj (这是第一次进断点),第二次进断点就变成null了 https://imgchr.com/i/D0Hm5Q(这是第二次进断点)
2、udtf代码: @FunctionHint(output = @DataTypeHint("ROW<high52 DOUBLE, low52 DOUBLE>")) public class HighLow52Func extends TableFunction<Row> { public void eval(Double high, Double low, @DataTypeHint("DECIMAL(18,4)") BigDecimal highPrice52, @DataTypeHint("DECIMAL(18,4)") BigDecimal lowPrice52){ double resultHigh52 = highPrice52 == null ? high : Math.max(high, highPrice52.doubleValue()); double resultLow52 = lowPrice52 == null ? low : Math.min(low, lowPrice52.doubleValue()); Row row = Row.of(resultHigh52, resultLow52); collect(row); } } 3、flink版本是1.11.2 4、执行的sql就是Table table = bsTableEnv.sqlQuery("SELECT HighLow52Func(a.High,a.Low,b.HighPrice52,b.LowPrice52) from market_data a left join budget_data b on a.Code = b.Security_Code"); PS: 刚接触flink sql,udtf这块有点想当然了没仔细看文档。又看了下官网的示例,应该是我的使用姿势错了,按照示例改了下SQL,问题已解决,感谢解答~ 在 2020-11-26 19:26:47,"hailongwang" <18868816...@163.com> 写道: >Hi, > +I 表示是一条 insert 的数据,其它类型的可以查看: > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java > > > 其中 NPE 问题,你是用哪个版本的呢? 方便复制下 sql 和 udtf 吗,我看下能不能复现。 > PS:“第一次调用正常” 是指? > > >Best, >Hailong > >在 2020-11-26 17:25:03,"bulterman" <15618338...@163.com> 写道: >>Hi all, >>我在使用udtf 调用collect方法的时候出现空指针异常,显示collector对象为null,第一次调用正常,第二次调用数据被标记了 >>“INSERT”是什么意思? >>https://imgchr.com/i/D0HuCj >>https://imgchr.com/i/D0Hm5Q >>udtf代码: >>https://imgchr.com/i/D0HeUg