[ https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780139#comment-17780139 ]
wuzq commented on FLINK-26050: ------------------------------ [~shenjiaqi] [~mayuehappy] Is there a solution to this problem.Using _state.backend.rocksdb.timer-service.factory to heap, but rocksdb small sst is still growing_ > Too many small sst files in rocksdb state backend when using processing time > window > ----------------------------------------------------------------------------------- > > Key: FLINK-26050 > URL: https://issues.apache.org/jira/browse/FLINK-26050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.10.2, 1.14.3 > Reporter: shen > Priority: Major > Attachments: image-2022-02-09-21-22-13-920.png, > image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, > image-2022-02-14-13-04-52-325.png > > > When using processing time window, in some workload, there will be a lot of > small sst files(serveral KB) in rocksdb local directory and may cause "Too > many files error". > Use rocksdb tool ldb to find out content in sst files: > * column family of these small sst files is "processing_window-timers". > * most sst files are in level-1. > * records in sst files are almost kTypeDeletion. > * creation time of sst file correspond to checkpoint interval. > These small sst files seem to be generated when flink checkpoint is > triggered. Although all content in sst are delete tags, they are not > compacted and deleted in rocksdb compaction because of not intersecting with > each other(rocksdb [compaction trivial > move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And > there seems to be no chance to delete them because of small size and not > intersect with other sst files. > > I will attach a simple program to reproduce the problem. > > Since timer in processing time window is generated in strictly ascending > order(both put and delete). So If workload of job happen to generate level-0 > sst files not intersect with each other.(for example: processing window size > much smaller than checkpoint interval, and no window content cross checkpoint > interval or no new data in window crossing checkpoint interval). There will > be many small sst files generated until job restored from savepoint, or > incremental checkpoint is disabled. > > May be similar problem exists when user use timer in operators with same > workload. > > Code to reproduce the problem: > {code:java} > package org.apache.flink.jira; > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.configuration.TaskManagerOptions; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import > org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.util.Collector; > import java.util.Collections; > import java.util.List; > import java.util.Random; > @Slf4j > public class StreamApp { > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(RestOptions.ADDRESS, "127.0.0.1"); > config.set(RestOptions.PORT, 10086); > config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6); > new > StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, > config)); > } > public void configureApp(StreamExecutionEnvironment env) throws Exception { > env.enableCheckpointing(20000); // 20sec > RocksDBStateBackend rocksDBStateBackend = > new > RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/", > true); // need to be reconfigured > > rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db"); > // need to be reconfigured > env.setStateBackend(rocksDBStateBackend); > env.getCheckpointConfig().setCheckpointTimeout(100000); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setTaskCancellationInterval(10000); > for (int i = 0; i < 1; ++i) { > createOnePipeline(env); > } > env.execute("StreamApp"); > } > private void createOnePipeline(StreamExecutionEnvironment env) { > // data source is configured so that little window cross checkpoint > interval > DataStreamSource<String> stream = env.addSource(new Generator(1, 3000, > 3600)); > stream.keyBy(x -> x) > // make sure window size less than checkpoint interval. though 100ms > is too small, I think increase this value can still reproduce the problem > with longer time. > .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100))) > .process(new ProcessWindowFunction<String, String, String, > TimeWindow>() { > @Override > public void process(String s, ProcessWindowFunction<String, String, > String, TimeWindow>.Context context, > Iterable<String> elements, Collector<String> out) { > for (String ele: elements) { > out.collect(ele); > } > } > }).print(); > } > public static final class Generator implements SourceFunction<String>, > ListCheckpointed<Integer> { > private static final long serialVersionUID = -2819385275681175792L; > private final int numKeys; > private final int idlenessMs; > private final int recordsToEmit; > private volatile int numRecordsEmitted = 0; > private volatile boolean canceled = false; > Generator(final int numKeys, final int idlenessMs, final int > durationSeconds) { > this.numKeys = numKeys; > this.idlenessMs = idlenessMs; > this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys; > } > @Override > public void run(final SourceContext<String> ctx) throws Exception { > Random rnd = new Random(); > while (numRecordsEmitted < recordsToEmit) { > synchronized (ctx.getCheckpointLock()) { > for (int i = 0; i < numKeys; i++) { > ctx.collect("" + rnd.nextInt(1)); > numRecordsEmitted++; > } > } > Thread.sleep(idlenessMs); > } > while (!canceled) { > Thread.sleep(50); > } > } > @Override > public void cancel() { > canceled = true; > } > @Override > public List<Integer> snapshotState(final long checkpointId, final long > timestamp) { > return Collections.singletonList(numRecordsEmitted); > } > @Override > public void restoreState(final List<Integer> states) { > for (final Integer state : states) { > numRecordsEmitted += state; > } > } > } > } > {code} > > Code to simulate flink checkpointing and timer creation and deletion and > reproduce the problem: > {code:cpp} > // > // main.cpp > // reproduce > // > // Created by shenjiaqi on 2022/2/8. > // > #include <iostream> > #include <filesystem> > #include <cstdio> > #include <cstdlib> > #include <string> > #include "rocksdb/utilities/checkpoint.h" > #include "rocksdb/db.h" > #include "rocksdb/slice.h" > #include "rocksdb/options.h" > using namespace ROCKSDB_NAMESPACE; > using ROCKSDB_NAMESPACE::DB; > using ROCKSDB_NAMESPACE::Options; > using ROCKSDB_NAMESPACE::PinnableSlice; > using ROCKSDB_NAMESPACE::ReadOptions; > using ROCKSDB_NAMESPACE::Status; > using ROCKSDB_NAMESPACE::WriteBatch; > using ROCKSDB_NAMESPACE::WriteOptions; > std::string kDBPath = "/Users/shenjiaqi/Workspace/flink/jira/data-test"; // > need to be reconfigured > static void createCheckpoint(rocksdb::DB *db, rocksdb::Status &s) { > std::cout << "create checkpoint" << std::endl; > std::string chkPath = kDBPath + "-chp"; > assert(chkPath.find("/Users/shenjiaqi/Workspace/flink/jira/") >= 0); // > just in case > system(("rm -rf " + chkPath).data()); // use with care. > > Checkpoint* checkpoint_ptr; > s = Checkpoint::Create(db, &checkpoint_ptr); > assert(s.ok()); > > s = checkpoint_ptr->CreateCheckpoint(chkPath); > assert(s.ok()); > } > int main() { > DB* db; > Options options; > // Optimize RocksDB. This is the easiest way to get RocksDB to perform > well > options.IncreaseParallelism(); > options.OptimizeLevelStyleCompaction(); > // create the DB if it's not already present > options.create_if_missing = true; > options.info_log_level = DEBUG_LEVEL; > // options.level_compaction_dynamic_level_bytes = true; > // open DB > Status s = DB::Open(options, kDBPath, &db); > assert(s.ok()); > for (int i = 0; i < 1000; ++i) { > std::string key = "key" + /* std::to_string((int)rand()); // > */std::to_string(i); > std::string value = "value" + std::to_string(i); > // Put key-value > s = db->Put(WriteOptions(), key, value); > assert(s.ok()); > // delete after put > s = db->Delete(WriteOptions(), key); > assert(s.ok()); > if (i > 0 && (i % 5) == 0) { > createCheckpoint(db, s); > } > } > > createCheckpoint(db, s); > return 0; > } > {code} > Many log such as: "Moving #407 to level-1 1047 bytes" can be found in LOG of > rocksdb (not enabled in flink by default). -- This message was sent by Atlassian Jira (v8.20.10#820010)