[ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780139#comment-17780139
 ] 

wuzq commented on FLINK-26050:
------------------------------

[~shenjiaqi] [~mayuehappy] Is there a solution to this problem.Using 
_state.backend.rocksdb.timer-service.factory to heap, but rocksdb small sst is 
still growing_

> Too many small sst files in rocksdb state backend when using processing time 
> window
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-26050
>                 URL: https://issues.apache.org/jira/browse/FLINK-26050
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.10.2, 1.14.3
>            Reporter: shen
>            Priority: Major
>         Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing time window, in some workload, there will be a lot of 
> small sst files(serveral KB) in rocksdb local directory and may cause "Too 
> many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
>     Configuration config = new Configuration();
>     config.set(RestOptions.ADDRESS, "127.0.0.1");
>     config.set(RestOptions.PORT, 10086);
>     config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
>     new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
>     env.enableCheckpointing(20000); // 20sec
>     RocksDBStateBackend rocksDBStateBackend =
>         new 
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
>  true); // need to be reconfigured
>     
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
>  // need to be reconfigured
>     env.setStateBackend(rocksDBStateBackend);
>     env.getCheckpointConfig().setCheckpointTimeout(100000);
>     env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
>     env.setParallelism(1);
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     env.getConfig().setTaskCancellationInterval(10000);
>     for (int i = 0; i < 1; ++i) {
>       createOnePipeline(env);
>     }
>     env.execute("StreamApp");
>   }
>   private void createOnePipeline(StreamExecutionEnvironment env) {
>     // data source is configured so that little window cross checkpoint 
> interval
>     DataStreamSource<String> stream = env.addSource(new Generator(1, 3000, 
> 3600));
>     stream.keyBy(x -> x)
>         // make sure window size less than checkpoint interval. though 100ms 
> is too small, I think increase this value can still reproduce the problem 
> with longer time.
>         .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
>         .process(new ProcessWindowFunction<String, String, String, 
> TimeWindow>() {
>           @Override
>           public void process(String s, ProcessWindowFunction<String, String, 
> String, TimeWindow>.Context context,
>               Iterable<String> elements, Collector<String> out) {
>             for (String ele: elements) {
>               out.collect(ele);
>             }
>           }
>         }).print();
>   }
>   public static final class Generator implements SourceFunction<String>, 
> ListCheckpointed<Integer> {
>     private static final long serialVersionUID = -2819385275681175792L;
>     private final int numKeys;
>     private final int idlenessMs;
>     private final int recordsToEmit;
>     private volatile int numRecordsEmitted = 0;
>     private volatile boolean canceled = false;
>     Generator(final int numKeys, final int idlenessMs, final int 
> durationSeconds) {
>       this.numKeys = numKeys;
>       this.idlenessMs = idlenessMs;
>       this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
>     }
>     @Override
>     public void run(final SourceContext<String> ctx) throws Exception {
>       Random rnd = new Random();
>       while (numRecordsEmitted < recordsToEmit) {
>         synchronized (ctx.getCheckpointLock()) {
>           for (int i = 0; i < numKeys; i++) {
>             ctx.collect("" + rnd.nextInt(1));
>             numRecordsEmitted++;
>           }
>         }
>         Thread.sleep(idlenessMs);
>       }
>       while (!canceled) {
>         Thread.sleep(50);
>       }
>     }
>     @Override
>     public void cancel() {
>       canceled = true;
>     }
>     @Override
>     public List<Integer> snapshotState(final long checkpointId, final long 
> timestamp) {
>       return Collections.singletonList(numRecordsEmitted);
>     }
>     @Override
>     public void restoreState(final List<Integer> states) {
>       for (final Integer state : states) {
>         numRecordsEmitted += state;
>       }
>     }
>   }
> }
>  {code}
>  
>  Code to simulate flink checkpointing and timer creation and deletion and 
> reproduce the problem:
> {code:cpp}
> //
> //  main.cpp
> //  reproduce
> //
> //  Created by shenjiaqi on 2022/2/8.
> //
> #include <iostream>
> #include <filesystem>
> #include <cstdio>
> #include <cstdlib>
> #include <string>
> #include "rocksdb/utilities/checkpoint.h"
> #include "rocksdb/db.h"
> #include "rocksdb/slice.h"
> #include "rocksdb/options.h"
> using namespace ROCKSDB_NAMESPACE;
> using ROCKSDB_NAMESPACE::DB;
> using ROCKSDB_NAMESPACE::Options;
> using ROCKSDB_NAMESPACE::PinnableSlice;
> using ROCKSDB_NAMESPACE::ReadOptions;
> using ROCKSDB_NAMESPACE::Status;
> using ROCKSDB_NAMESPACE::WriteBatch;
> using ROCKSDB_NAMESPACE::WriteOptions;
> std::string kDBPath = "/Users/shenjiaqi/Workspace/flink/jira/data-test"; // 
> need to be reconfigured
> static void createCheckpoint(rocksdb::DB *db, rocksdb::Status &s) {
>     std::cout << "create checkpoint" << std::endl;
>     std::string chkPath = kDBPath + "-chp";
>     assert(chkPath.find("/Users/shenjiaqi/Workspace/flink/jira/") >= 0); // 
> just in case
>     system(("rm -rf " + chkPath).data()); // use with care.
>     
>     Checkpoint* checkpoint_ptr;
>     s = Checkpoint::Create(db, &checkpoint_ptr);
>     assert(s.ok());
>     
>     s = checkpoint_ptr->CreateCheckpoint(chkPath);
>     assert(s.ok());
> }
> int main() {
>     DB* db;
>     Options options;
>     // Optimize RocksDB. This is the easiest way to get RocksDB to perform 
> well
>     options.IncreaseParallelism();
>     options.OptimizeLevelStyleCompaction();
>     // create the DB if it's not already present
>     options.create_if_missing = true;
>     options.info_log_level = DEBUG_LEVEL;
> //    options.level_compaction_dynamic_level_bytes = true;
>     // open DB
>     Status s = DB::Open(options, kDBPath, &db);
>     assert(s.ok());
>     for (int i = 0; i < 1000; ++i) {
>         std::string key = "key" + /* std::to_string((int)rand()); // 
> */std::to_string(i);
>         std::string value = "value" + std::to_string(i);
>         // Put key-value
>         s = db->Put(WriteOptions(), key, value);
>         assert(s.ok());
>         // delete after put
>         s = db->Delete(WriteOptions(), key);
>         assert(s.ok());
>         if (i > 0 && (i % 5) == 0) {
>             createCheckpoint(db, s);
>         }
>     }
>     
>     createCheckpoint(db, s);
>     return 0;
> }
> {code}
> Many log such as: "Moving #407 to level-1 1047 bytes" can be found in LOG of 
> rocksdb (not enabled in flink by default).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to