[jira] [Commented] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction

2022-07-18 Thread Yue Shang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568279#comment-17568279
 ] 

Yue Shang commented on FLINK-28562:
---

ok,

i will reach out to the community for help.

 

thank you.

> Rocksdb state backend is too slow when using AggregateFunction
> --
>
> Key: FLINK-28562
> URL: https://issues.apache.org/jira/browse/FLINK-28562
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.2, 1.14.3
> Environment: {code:java}
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data
> DataStream source = env.addSource(new SourceFunction() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> Random rd = new Random();
> while (true){
> UserTag userTag = new UserTag();
> userTag.setUserId(rd.nextLong());
> userTag.setMetricName(UUID.randomUUID().toString());
> userTag.setTagDimension(UUID.randomUUID().toString());
> userTag.setTagValue(rd.nextDouble());
> userTag.setTagTime( new Long(new Date().getTime()).intValue());
> userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
> userTag.setAggregationPeriod("5s");
> userTag.setAggregationType("sum");
> userTag.setTimePeriod("hour");
> userTag.setDataType("number");
> userTag.setBaseTime(1657803600);
> userTag.setTopic(UUID.randomUUID().toString());
> for(int i = 0;i<100;i++){
> userTag.setUserTagName(UUID.randomUUID() + "-"+i);
> ctx.collect(userTag);
> }
> Thread.sleep(1);
> }
> }
> @Override
> public void cancel() {
> }
> });
> source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
> .aggregate(new AggregateFunction, 
> List>(){
> @Override
> public Map createAccumulator() {
> return new HashMap<>();
> }
> @Override
> public Map add(UserTag userTag, Map UserTag> stringUserTagMap) {
> stringUserTagMap.put(userTag.getUserTagName(),userTag);
> return stringUserTagMap;
> }
> @Override
> public List getResult(Map 
> stringUserTagMap) {
> return new ArrayList<>(stringUserTagMap.values());
> }
> @Override
> public Map merge(Map acc1, 
> Map acc2) {
> acc1.putAll(acc2);
> return acc1;
> }
> })
> .setParallelism(1)
> .name("NewUserTagAggregation_5s")
> .print().setParallelism(2);
> // execute program
> env.execute(); {code}
> !image-2022-07-15-14-19-21-200.png!
> !image-2022-07-15-14-19-41-678.png!
>Reporter: Yue Shang
>Priority: Major
> Attachments: image-2022-07-15-14-19-21-200.png, 
> image-2022-07-15-14-19-41-678.png, image-2022-07-18-19-56-45-650.png, 
> image-2022-07-18-19-59-18-872.png, image-2022-07-18-20-34-54-407.png, 
> image-2022-07-18-20-36-23-021.png
>
>
> Rocksdb state backend is too slow when using AggregateFunction.
> just only supports 300 traffic per second use Map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28562) Rocksdb state backend is too slow when using AggregateFunction

2022-07-18 Thread Yue Shang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567969#comment-17567969
 ] 

Yue Shang commented on FLINK-28562:
---

sorry,i forgot my env opts,

-Dstate.backend.incremental=true
-Drest.flamegraph.enabled=true
-Dstate.backend=rocksdb

 

just  0.numRecordsInPerSecond will be 170/s while use rocksdb,but 
0.numRecordsInPerSecond is 7/s while state.backend change to hashmap.

 

And  i had register kryo type:
{code:java}
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerKryoType(Map.class);
env.getConfig().registerKryoType(UserTag.class);
env.getConfig().registerKryoType(List.class);{code}
but also not work,it just 300/s.Flame graph  is very terrible.

!image-2022-07-18-20-34-54-407.png!

!image-2022-07-18-20-36-23-021.png!

ALL  application mode jobManager configuration:
$internal.application.main WindowAggTest
$internal.application.program-args --nub;1
$internal.deployment.config-dir 
/data/services/-202207062015-master-7cf96f44/bin/bylink/flink/flink-env/flink-1.14.3/conf
$internal.yarn.log-config-file 
/data/services/-202207062015-master-7cf96f44/bin/bylink/flink/flink-env/flink-1.14.3/conf/log4j.properties
classloader.resolve-order parent-first
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
execution.checkpointing.interval 600s
execution.checkpointing.mode EXACTLY_ONCE
execution.checkpointing.timeout 60s
execution.checkpointing.tolerable-failed-checkpoints 3
execution.target embedded
high-availability.cluster-id application_***_
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs 
/data/storage/yarn/local/usercache/*/appcache/application_***_
jobmanager.archive.fs.dir hdfs:////flink/completed-jobs/
jobmanager.memory.heap.size 469762048b
jobmanager.memory.jvm-metaspace.size 268435456b
jobmanager.memory.jvm-overhead.max 201326592b
jobmanager.memory.jvm-overhead.min 201326592b
jobmanager.memory.off-heap.size 134217728b
jobmanager.memory.process.size 1024m
jobmanager.rpc.address 
jobmanager.rpc.port 
parallelism.default 1
pipeline.classpaths
pipeline.jars file://container_/WindowsAggTest-1.0-SNAPSHOT.jar
pipeline.name 1757_Admin_aggTest
rest.address **
rest.flamegraph.enabled true
state.backend rocksdb
state.backend.incremental true
state.checkpoint-storage filesystem
state.checkpoints.dir hdfs:///**/flink/checkpoints/1757
state.checkpoints.num-retained 10
state.savepoints.dir hdfs:///**/flink/checkpoints/1757
taskmanager.memory.process.size 8192m
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-8
yarn.application.name 1757_Admin_aggTest
yarn.application.queue 
yarn.application.type Apache Flink 1.14.3
yarn.flink-dist-jar 
hdfs:///***/flink/flink-env/flink-1.14.3/lib/flink-dist_2.12-1.14.3.jar
yarn.provided.lib.dirs hdfs:///*/flink/flink-env/flink-1.14.3/lib
 

> Rocksdb state backend is too slow when using AggregateFunction
> --
>
> Key: FLINK-28562
> URL: https://issues.apache.org/jira/browse/FLINK-28562
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.2, 1.14.3
> Environment: {code:java}
> final ParameterTool params = ParameterTool.fromArgs(args);
> // set up the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // get input data
> DataStream source = env.addSource(new SourceFunction() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> Random rd = new Random();
> while (true){
> UserTag userTag = new UserTag();
> userTag.setUserId(rd.nextLong());
> userTag.setMetricName(UUID.randomUUID().toString());
> userTag.setTagDimension(UUID.randomUUID().toString());
> userTag.setTagValue(rd.nextDouble());
> userTag.setTagTime( new Long(new Date().getTime()).intValue());
> userTag.setMonitorTime(new Long(new Date().getTime()).intValue());
> userTag.setAggregationPeriod("5s");
> userTag.setAggregationType("sum");
> userTag.setTimePeriod("hour");
> userTag.setDataType("number");
> userTag.setBaseTime(1657803600);
> userTag.setTopic(UUID.randomUUID().toString());
> for(int i = 0;i<100;i++){
> userTag.setUserTagName(UUID.randomUUID() + "-"+i);
> ctx.collect(userTag);
> }
> Thread.sleep(1);
> }
> }
> @Override
> public void cancel() {
> }
> });
> source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
> .aggregate(new Agg