退订
Re: 请教flink cep如何对无序数据处理
这样可以嘛 > Pattern pattern = Pattern.begin("start").where(new > SimpleCondition() { >@Override >public boolean filter(Event value) throws Exception { > return value.getName().equals("a") || value.getName().equals("b"); >} > }).next("next").where(new IterativeCondition() { >@Override >public boolean filter(Event value, Context ctx) throws Exception { > Event last = ctx.getEventsForPattern("start").iterator().next(); > if(last != null){ > if( (last.getName().equals("a") && value.getName().equals("b")) || > (last.getName().equals("b") && value.getName().equals("a"))){ > return true; > } > } > return false; >} > > > sherlock zw mailto:zw30...@live.com>> 于2021年5月14日周五 > 上午8:52写道: > 兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛? > 我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件 > >
flink checkpoint 数据清理问题
背景:1.flink 集群模式 standalone HA ,共三台,zk选举jobmanager,1 active 1 standby 2.文件系统由于公司原因,没有用hdfs,用的本地文件系统 3.backend用的增量rocksdb 配置情况: 进程分布情况如下: 问题:checkpoint数据在01(01是主jobmanager)上占用很小,但在02、03节点增长特别快,目前已占用数据盘90%的存储。 但由于使用的是增量rocksdb,不敢随意删chekpoint数据,想问问大佬们: 1.是否有办法让flink自动清理过期checkpoint,还是只能通过加磁盘方式解决? 2.我一直认为checkpoint数据是由主jobmanager触发写的,那么在当前情况应该只有01节点会写入checkpoint(本地磁盘),为什么02、03节点也会写入checkpoint,而且写入量比01大那么多?
????
flink sql支持Common Table Expression (CTE)吗?
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx 来实现?CTE和temporary view的区别是什么? 例如 with toronto_ppl as ( SELECT DISTINCT name FROM population WHERE country = "Canada" AND city = "Toronto" ) , avg_female_salary as ( SELECT AVG(salary) as avgSalary FROM salaries WHERE gender = "Female" ) SELECT name , salary FROM People WHERE name in (SELECT DISTINCT FROM toronto_ppl) AND salary >= (SELECT avgSalary FROM avg_female_salary)
flink sql支持创建临时函数吗?
如下 CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS ( CASE WHEN tenure < 1 THEN "analyst" WHEN tenure BETWEEN 1 and 3 THEN "associate" WHEN tenure BETWEEN 3 and 5 THEN "senior" WHEN tenure > 5 THEN "vp" ELSE "n/a" END ); SELECT name , get_seniority(tenure) as seniority FROM employees
????????????
退订
退订
退订
退订
Flink Jdbc Connector close方法线程同步
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcBatchingOutputFormat.java 为什么上诉代码中的close方法要进行线程同步,这个是什么考虑。