你好,我的朋友:

      我使用的是 Flink 1.10 Blink Planer。
      我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。


      为什么我想要这个功能:
      场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 
get_int、get_double、get_string 这样的方式,实现起来又非常多
     场景2: 我的数据是一个 Json ,问题同上。
  
     在场景1中,我改了下 Flink 的源码,在 ScalarFunction 中加了一个初始化方法,在Flink 初始化 scalar 
function 的时候,进行相关的初始化
@Override
public void initialize(LogicalType[] sqlTypes, String[] paramNames) {
// 在这个函数里面做一些事情,比如,我可以根据 paramNames 去取 pb 的 schema 信息,拿到类型信息,这样就可以动态的设置类型
}
    这个方法很有效果,他帮我们 workaround 了一段时间,目前依然work。只是有些不是那么优雅。
    这个case 就是我想要的一个,不过,目前这个会返回 RAW('java.lang.Object', ?) 这个类型不进行 cast 是无法直接使用的。
public class TimestampTest extends ScalarFunction {

public Object eval(long timestamp, String pattern, int num) {
        Timestamp timestamp1 = new Timestamp(timestamp);
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
        if (num < 4) {
//返回 STRING 类型
return String.valueOf(timestamp);
}
if (num < 6) {
//返回 BIGINT
return timestamp - 100;
}
if (num < 8) {
//返回 DOUBLE
double ss = 0.9;
            return (double) timestamp + ss;
}
//返回 STRING
return sdf.format(timestamp1);
}
}

回复