自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ? 就不知道这个TypeInformation该如何写。
代码如下: import io.airlift.slice.Slices; import io.airlift.stats.cardinality.HyperLogLog; import org.apache.flink.table.functions.AggregateFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; public class FlinkUDAFCardinalityEstimationFunction extends AggregateFunction<Long, HyperLogLog> { private static final Logger LOG = LoggerFactory.getLogger(JsonArrayParseUDTF.class); private static final int NUMBER_OF_BUCKETS = 4096; @Override public HyperLogLog createAccumulator() { return HyperLogLog.newInstance(NUMBER_OF_BUCKETS); } @Override public Long getValue(HyperLogLog acc) { if(acc == null){ return 0L; } return acc.cardinality(); } public void accumulate(HyperLogLog acc, String element) { if(element == null){ return; } acc.add(Slices.utf8Slice(element)); } public void retract(HyperLogLog acc, byte[] element) { // do nothing LOG.info("-- retract:" + new String(element)); } public void merge(HyperLogLog acc, Iterable<HyperLogLog> it) { Iterator<HyperLogLog> iter = it.iterator(); while (iter.hasNext()) { HyperLogLog a = iter.next(); if(a != null) { acc.mergeWith(a); } } } public void resetAccumulator(HyperLogLog acc) { acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS); } }