Yue Shang created FLINK-28562: --------------------------------- Summary: Rocksdb state backend is too slow when using AggregateFunction Key: FLINK-28562 URL: https://issues.apache.org/jira/browse/FLINK-28562 Project: Flink Issue Type: Bug Affects Versions: 1.14.3, 1.13.2 Environment: {code:java} final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() { @Override public void run(SourceContext ctx) throws Exception { Random rd = new Random(); while (true){ UserTag userTag = new UserTag(); userTag.setUserId(rd.nextLong()); userTag.setMetricName(UUID.randomUUID().toString()); userTag.setTagDimension(UUID.randomUUID().toString()); userTag.setTagValue(rd.nextDouble()); userTag.setTagTime( new Long(new Date().getTime()).intValue()); userTag.setMonitorTime(new Long(new Date().getTime()).intValue()); userTag.setAggregationPeriod("5s"); userTag.setAggregationType("sum"); userTag.setTimePeriod("hour"); userTag.setDataType("number"); userTag.setBaseTime(1657803600); userTag.setTopic(UUID.randomUUID().toString()); for(int i = 0;i<100;i++){ userTag.setUserTagName(UUID.randomUUID() + "-"+i); ctx.collect(userTag); } Thread.sleep(1); } } @Override public void cancel() { } }); source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, List<UserTag>>(){ @Override public Map<String, UserTag> createAccumulator() { return new HashMap<>(); } @Override public Map<String, UserTag> add(UserTag userTag, Map<String, UserTag> stringUserTagMap) { stringUserTagMap.put(userTag.getUserTagName(),userTag); return stringUserTagMap; } @Override public List<UserTag> getResult(Map<String, UserTag> stringUserTagMap) { return new ArrayList<>(stringUserTagMap.values()); } @Override public Map<String, UserTag> merge(Map<String, UserTag> acc1, Map<String, UserTag> acc2) { acc1.putAll(acc2); return acc1; } }) .setParallelism(1) .name("NewUserTagAggregation_5s") .print().setParallelism(2); // execute program env.execute(); {code} !image-2022-07-15-14-19-21-200.png! !image-2022-07-15-14-19-41-678.png! Reporter: Yue Shang Attachments: image-2022-07-15-14-19-21-200.png, image-2022-07-15-14-19-41-678.png Rocksdb state backend is too slow when using AggregateFunction. just only supports 300 traffic per second use Map<String,Object>. -- This message was sent by Atlassian Jira (v8.20.10#820010)