?????? flink 1.11 interval join??????rocksdb????????????
??slotrocksdb block_cache_usage??usageblock_cache_capacity?? mem table size?? ---- ??: "user-zh"
flink 1.11 interval join??????rocksdb????????????
Hi, flink 1.11 on k8sjoin??sql??rocksdbbackend??flink managedflink??state.backend.rocksdb.memory.managed=truek8s??pod flink sql: insert into console_sink select t1.*, t2.* from t1 left join t2 on t1.unique_id = t2.unique_id and t1.event_time BETWEEN t2.event_time - INTERVAL '1' HOUR AND t2.event_time + INTERVAL '1' HOUR ?? state.backend=rocksdb; state.backend.incremental=false; state.backend.rocksdb.memory.managed=true state.idle.retention.mintime='10 min'; state.idle.retention.maxtime='20 min'; checkpoint.time.interval='15 min'; source.idle-timeout='6 ms'; taskmanager.memory.flink.size =55 gb taskmanager.memory.managed.fraction=0.85 ?? 1. checkpoint??size??200G??state 2. k8s podpodpod??pod 3. promethus??metrics, rocksdb_block_cache_usagerocksdb_block_cache_capacityrocksdb_block_cache_usageflink managed flink??rocksdb??rocksdb_block_cache_usage
??????flink sql count distinct??????????
---- ??: "user-zh"
flink sql count distinct??????????
??flink sqldaugroupbycount distinct user_id??table.optimizer.distinct-agg.split.enabled=true job??mysql 2020-10-10 19:00:00 100 2020-10-10 19:00:02 98 2020-10-10 19:00:04 102 2020-10-10 19:00:06 108 2020-10-10 19:00:08 106 2020-10-10 19:00:10 110 sql?? create table jdbc_sink( date_str varchar , dau bigint, PRIMARY KEY (date_str) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxx', 'table-name' = 'xxx', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'xxx', 'password' = 'xxx' ); CREATE TABLE action_log_source ( user_id varchar, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) with ( ... ); INSERT INTO jdbc_sink SELECT day_str as date_str, COUNT(DISTINCT user_id) AS dau FROM ( select user_id as user_id, date_format(event_time, '-MM-dd') as day_str from action_log_source ) GROUP BY day_str
flink??????????udf????????????????????????
hi, ??udfudf?? udf public class GetTimeFunc extends ScalarFunction { public String eval() { return new SimpleDateFormat("-MM-dd HH:mm:ss").format(new Date()); } } udf??flink
?????? flink lookup join ????????????????????????
??note_id??binlog??note_id?? ??note_id ---- ??:"Benchao Li"