[ https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568279#comment-17568279 ]
Yue Shang commented on FLINK-28562: ----------------------------------- ok, i will reach out to the community for help. thank you. > Rocksdb state backend is too slow when using AggregateFunction > -------------------------------------------------------------- > > Key: FLINK-28562 > URL: https://issues.apache.org/jira/browse/FLINK-28562 > Project: Flink > Issue Type: Bug > Affects Versions: 1.13.2, 1.14.3 > Environment: {code:java} > final ParameterTool params = ParameterTool.fromArgs(args); > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // get input data > DataStream<UserTag> source = env.addSource(new SourceFunction<UserTag>() { > @Override > public void run(SourceContext ctx) throws Exception { > Random rd = new Random(); > while (true){ > UserTag userTag = new UserTag(); > userTag.setUserId(rd.nextLong()); > userTag.setMetricName(UUID.randomUUID().toString()); > userTag.setTagDimension(UUID.randomUUID().toString()); > userTag.setTagValue(rd.nextDouble()); > userTag.setTagTime( new Long(new Date().getTime()).intValue()); > userTag.setMonitorTime(new Long(new Date().getTime()).intValue()); > userTag.setAggregationPeriod("5s"); > userTag.setAggregationType("sum"); > userTag.setTimePeriod("hour"); > userTag.setDataType("number"); > userTag.setBaseTime(1657803600); > userTag.setTopic(UUID.randomUUID().toString()); > for(int i = 0;i<100;i++){ > userTag.setUserTagName(UUID.randomUUID() + "-"+i); > ctx.collect(userTag); > } > Thread.sleep(1); > } > } > @Override > public void cancel() { > } > }); > source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) > .aggregate(new AggregateFunction<UserTag, Map<String,UserTag>, > List<UserTag>>(){ > @Override > public Map<String, UserTag> createAccumulator() { > return new HashMap<>(); > } > @Override > public Map<String, UserTag> add(UserTag userTag, Map<String, > UserTag> stringUserTagMap) { > stringUserTagMap.put(userTag.getUserTagName(),userTag); > return stringUserTagMap; > } > @Override > public List<UserTag> getResult(Map<String, UserTag> > stringUserTagMap) { > return new ArrayList<>(stringUserTagMap.values()); > } > @Override > public Map<String, UserTag> merge(Map<String, UserTag> acc1, > Map<String, UserTag> acc2) { > acc1.putAll(acc2); > return acc1; > } > }) > .setParallelism(1) > .name("NewUserTagAggregation_5s") > .print().setParallelism(2); > // execute program > env.execute(); {code} > !image-2022-07-15-14-19-21-200.png! > !image-2022-07-15-14-19-41-678.png! > Reporter: Yue Shang > Priority: Major > Attachments: image-2022-07-15-14-19-21-200.png, > image-2022-07-15-14-19-41-678.png, image-2022-07-18-19-56-45-650.png, > image-2022-07-18-19-59-18-872.png, image-2022-07-18-20-34-54-407.png, > image-2022-07-18-20-36-23-021.png > > > Rocksdb state backend is too slow when using AggregateFunction. > just only supports 300 traffic per second use Map<String,Object>. -- This message was sent by Atlassian Jira (v8.20.10#820010)