[jira] [Commented] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568279#comment-17568279 ] Yue Shang commented on FLINK-28562: --- ok, i will reach out to the community for help. thank you. > Rocksdb state backend is too slow when using AggregateFunction > -- > > Key: FLINK-28562 > URL: https://issues.apache.org/jira/browse/FLINK-28562 > Project: Flink > Issue Type: Bug >Affects Versions: 1.13.2, 1.14.3 > Environment: {code:java} > final ParameterTool params = ParameterTool.fromArgs(args); > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // get input data > DataStream source = env.addSource(new SourceFunction() { > @Override > public void run(SourceContext ctx) throws Exception { > Random rd = new Random(); > while (true){ > UserTag userTag = new UserTag(); > userTag.setUserId(rd.nextLong()); > userTag.setMetricName(UUID.randomUUID().toString()); > userTag.setTagDimension(UUID.randomUUID().toString()); > userTag.setTagValue(rd.nextDouble()); > userTag.setTagTime( new Long(new Date().getTime()).intValue()); > userTag.setMonitorTime(new Long(new Date().getTime()).intValue()); > userTag.setAggregationPeriod("5s"); > userTag.setAggregationType("sum"); > userTag.setTimePeriod("hour"); > userTag.setDataType("number"); > userTag.setBaseTime(1657803600); > userTag.setTopic(UUID.randomUUID().toString()); > for(int i = 0;i<100;i++){ > userTag.setUserTagName(UUID.randomUUID() + "-"+i); > ctx.collect(userTag); > } > Thread.sleep(1); > } > } > @Override > public void cancel() { > } > }); > source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) > .aggregate(new AggregateFunction, > List>(){ > @Override > public Map createAccumulator() { > return new HashMap<>(); > } > @Override > public Map add(UserTag userTag, Map UserTag> stringUserTagMap) { > stringUserTagMap.put(userTag.getUserTagName(),userTag); > return stringUserTagMap; > } > @Override > public List getResult(Map > stringUserTagMap) { > return new ArrayList<>(stringUserTagMap.values()); > } > @Override > public Map merge(Map acc1, > Map acc2) { > acc1.putAll(acc2); > return acc1; > } > }) > .setParallelism(1) > .name("NewUserTagAggregation_5s") > .print().setParallelism(2); > // execute program > env.execute(); {code} > !image-2022-07-15-14-19-21-200.png! > !image-2022-07-15-14-19-41-678.png! >Reporter: Yue Shang >Priority: Major > Attachments: image-2022-07-15-14-19-21-200.png, > image-2022-07-15-14-19-41-678.png, image-2022-07-18-19-56-45-650.png, > image-2022-07-18-19-59-18-872.png, image-2022-07-18-20-34-54-407.png, > image-2022-07-18-20-36-23-021.png > > > Rocksdb state backend is too slow when using AggregateFunction. > just only supports 300 traffic per second use Map. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567969#comment-17567969 ] Yue Shang commented on FLINK-28562: --- sorry,i forgot my env opts, -Dstate.backend.incremental=true -Drest.flamegraph.enabled=true -Dstate.backend=rocksdb just 0.numRecordsInPerSecond will be 170/s while use rocksdb,but 0.numRecordsInPerSecond is 7/s while state.backend change to hashmap. And i had register kryo type: {code:java} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().registerKryoType(Map.class); env.getConfig().registerKryoType(UserTag.class); env.getConfig().registerKryoType(List.class);{code} but also not work,it just 300/s.Flame graph is very terrible. !image-2022-07-18-20-34-54-407.png! !image-2022-07-18-20-36-23-021.png! ALL application mode jobManager configuration: $internal.application.main WindowAggTest $internal.application.program-args --nub;1 $internal.deployment.config-dir /data/services/-202207062015-master-7cf96f44/bin/bylink/flink/flink-env/flink-1.14.3/conf $internal.yarn.log-config-file /data/services/-202207062015-master-7cf96f44/bin/bylink/flink/flink-env/flink-1.14.3/conf/log4j.properties classloader.resolve-order parent-first execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION execution.checkpointing.interval 600s execution.checkpointing.mode EXACTLY_ONCE execution.checkpointing.timeout 60s execution.checkpointing.tolerable-failed-checkpoints 3 execution.target embedded high-availability.cluster-id application_***_ internal.cluster.execution-mode NORMAL internal.io.tmpdirs.use-local-default true io.tmp.dirs /data/storage/yarn/local/usercache/*/appcache/application_***_ jobmanager.archive.fs.dir hdfs:////flink/completed-jobs/ jobmanager.memory.heap.size 469762048b jobmanager.memory.jvm-metaspace.size 268435456b jobmanager.memory.jvm-overhead.max 201326592b jobmanager.memory.jvm-overhead.min 201326592b jobmanager.memory.off-heap.size 134217728b jobmanager.memory.process.size 1024m jobmanager.rpc.address jobmanager.rpc.port parallelism.default 1 pipeline.classpaths pipeline.jars file://container_/WindowsAggTest-1.0-SNAPSHOT.jar pipeline.name 1757_Admin_aggTest rest.address ** rest.flamegraph.enabled true state.backend rocksdb state.backend.incremental true state.checkpoint-storage filesystem state.checkpoints.dir hdfs:///**/flink/checkpoints/1757 state.checkpoints.num-retained 10 state.savepoints.dir hdfs:///**/flink/checkpoints/1757 taskmanager.memory.process.size 8192m taskmanager.numberOfTaskSlots 1 web.port 0 web.tmpdir /tmp/flink-web-8 yarn.application.name 1757_Admin_aggTest yarn.application.queue yarn.application.type Apache Flink 1.14.3 yarn.flink-dist-jar hdfs:///***/flink/flink-env/flink-1.14.3/lib/flink-dist_2.12-1.14.3.jar yarn.provided.lib.dirs hdfs:///*/flink/flink-env/flink-1.14.3/lib > Rocksdb state backend is too slow when using AggregateFunction > -- > > Key: FLINK-28562 > URL: https://issues.apache.org/jira/browse/FLINK-28562 > Project: Flink > Issue Type: Bug >Affects Versions: 1.13.2, 1.14.3 > Environment: {code:java} > final ParameterTool params = ParameterTool.fromArgs(args); > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // get input data > DataStream source = env.addSource(new SourceFunction() { > @Override > public void run(SourceContext ctx) throws Exception { > Random rd = new Random(); > while (true){ > UserTag userTag = new UserTag(); > userTag.setUserId(rd.nextLong()); > userTag.setMetricName(UUID.randomUUID().toString()); > userTag.setTagDimension(UUID.randomUUID().toString()); > userTag.setTagValue(rd.nextDouble()); > userTag.setTagTime( new Long(new Date().getTime()).intValue()); > userTag.setMonitorTime(new Long(new Date().getTime()).intValue()); > userTag.setAggregationPeriod("5s"); > userTag.setAggregationType("sum"); > userTag.setTimePeriod("hour"); > userTag.setDataType("number"); > userTag.setBaseTime(1657803600); > userTag.setTopic(UUID.randomUUID().toString()); > for(int i = 0;i<100;i++){ > userTag.setUserTagName(UUID.randomUUID() + "-"+i); > ctx.collect(userTag); > } > Thread.sleep(1); > } > } > @Override > public void cancel() { > } > }); > source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) > .aggregate(new Agg