Re: MapState bad performance
Hi Nick Sorry for the late jump in. Just wondering why you call putAll of RocksDBMapState and has RocksDBMapState#clear() followed. seems the state will always be empty after the process. Best, Congxian Yun Tang 于2020年6月16日周二 下午7:42写道: > Hi Nick > > From my experience, it's not easy to tune this without code to reproduce. > Could you please give code with fake source to reproduce so that we could > help you? > > If CPU usage is 100% at rocksDB related methods, it's might be due to we > access RocksDB too often . If the CPU usage is not 100% while disk util is > 100%, it should be > we meet the performance limit of disk. > > BTW, if you have 16GB memory TM with 32 slots, it would only give about > 150MB managed memory [1][2] for RocksDB, which looks like a bit small. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_setup.html#managed-memory > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_tuning.html#rocksdb-state-backend > > Best > Yun Tang > > > -- > *From:* nick toker > *Sent:* Tuesday, June 16, 2020 18:36 > *To:* Yun Tang > *Cc:* user@flink.apache.org > *Subject:* Re: MapState bad performance > > Hi, > > We are using flink version 1.10.1 > The task manager memory 16GB > The number of slots is 32 but the job parallelism is 1. > We used the default configuration for rocksdb. > We checked the disk speed on the machine running the task manager: Write > 300MB and read 1GB > > BR, > Nick > > בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת Yun Tang <myas...@live.com > >: > > Hi Nick > > As you might know, RocksDB suffers not so good performance for > iterator-like operations due to it needs to merge sort for multi levels. [1] > > Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek > operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator > over state and remove entry [3]. > However, even these operations behaves not so good, I don't think they > would behave extremely bad in general case. From our experience on SSD, the > latency of seek should be less than 100us > and could go up to hundreds of us, did you use SSD disk? > >1. What is the Flink version, taskmanager memory, number of slots and >RocksDB related configurations? >2. Have you checked the IOPS, disk util for those machines which >containing task manager running RocksDB? > > > [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation > [2] > https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241 > [3] > https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254 > > Best > Yun Tang > > -- > *From:* nick toker > *Sent:* Tuesday, June 16, 2020 15:35 > *To:* user@flink.apache.org > *Subject:* MapState bad performance > > Hello, > > We wrote a very simple streaming pipeline containing: > 1. Kafka consumer > 2. Process function > 3. Kafka producer > > The code of the process function is listed below: > > private transient MapState testMapState; > > @Override > public void processElement(Map value, Context ctx, > Collector> out) throws Exception { > > if (testMapState.isEmpty()) { > > testMapState.putAll(value); > > out.collect(value); > > testMapState.clear(); > } > } > > We faced very bad performance and then we made some tests using jprofiler. > Using jprofiler, we saw that the hot spots are 2 functions of the MapState: > 1. isEmpty() - around 7 ms > 2. clear() - around 4 ms > > We had to change and use ValueState instead. > > Are we using the MapState in the correct way or are we doing something > wrong ? > Is this behaviour expected because flink recommendations are to use > MapState and NOT ValueState ? > > BR, > Nick > >
Re: Trouble with large state
Hi Sorry to jump in late. After read the previous email. I have such assumptions, and please correct me if I'm wrong: - RocksDBStateBackend with incremental checkpoint - at least once mode - the parallelism for the stateful operator is 8 - checkpoint may take too long to complete - has fix rate input by using throttler. >From the latest email, the checkpoint size grows from start to end, and the e2e time grows also. >From my side. e2e checkpoint time depends on the e2e snapshot time of all operators. the e2e snapshot time of operator depends on the ${barrier_align_time} + ${sync-snapshot-time} + ${async-snapshot-time}. For at least once mode, you can enable debug log to track the process of barrier align time. You can find out which step is the bottleneck, and track one task to find out the reason. maybe you could try: 1. use fullsnapshot of RocksDBStateBackend(disable incremental checkpoint) and see what the e2e time of checkpoint will be -- This wants to verify whether there is too many "increment change" between checkpoints. 2. place the times on Heap and RocksDB, whether this will affect the checkpoint time -- The timer on heap will affect the sync-snapshot time 3. find out whether there is io/disk problem when snapshotting? 4. find out whether there is network problem when snapshotting? 5. does upload the state using multiple threads[1] help here [1] https://issues.apache.org/jira/browse/FLINK-11008 Best, Congxian Jeff Henrikson 于2020年6月21日周日 上午2:46写道: > Bhaskar, > > > Glad to know some progress. > > Yeah, some progress. Yet overnight run didn't look as good as I hoped. > > The throttling required to not crash during snapshots seems to be quite > different from the throttling required to crash not during snapshots. > So the lowest common denominator is quite a large performance penalty. > > What's worse, the rate of input that makes the snapshot performance go > from good to bad seems to change significantly as the state size grows. > Here is checkpoint history from an overnight run. > > Parameters: > > - 30 minutes minimum between snapshots > - incremental snapshot mode > - inputs throttled to 100 events per sec per input per slot, >which is around 1/4 of the unthrottled throughput > > Checkpoint history: > > ID Status AcknowledgedTrigger TimeLatest > Acknowledgement End to End > DurationState Size Buffered During Alignment > 12 COMPLETED 304/304 8:52:22 10:37:181h 44m > 55s 60.5 GB 0 B > 11 COMPLETED 304/304 6:47:03 8:22:19 1h 35m 16s > 53.3 GB 0 B > 10 COMPLETED 304/304 5:01:20 6:17:00 1h 15m 39s > 41.0 GB 0 B > 9 COMPLETED 304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B > 8 COMPLETED 304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B > 7 COMPLETED 304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B > 6 COMPLETED 304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B > 5 COMPLETED 304/304 0:23:27 0:28:01 4m 33s 14.3 GB 0 B > 4 COMPLETED 304/304 23:52:2923:53:26 > 56s 12.7 GB 0 B > 3 COMPLETED 304/304 23:20:5923:22:281m > 29s 10.8 GB 0 B > 2 COMPLETED 304/304 22:46:1722:50:584m > 40s 7.40 GB 0 B > > As you can see, GB/minute varies drastically. GB/minute also varies > drastically with full checkpoint mode. > > I'm pleased that it hasn't crashed yet. Yet I'm concerned that with the > checkpoint GB/minute getting so slow, it will crash soon. > > I'm really wishing state.backend.async=false worked for > RocksDbStateBackend. > > I'm also wondering if my throttler would improve if I just connected to > the REST api to ask if any checkpoint is in progress, and then paused > inputs accordingly. Effectively state.backend.async=false via hacked > application code. > > > Where are you updating your state here? I > > couldn't find any flink managed state here. > > The only updates to state I make are through the built-in > DataStream.cogroup. A unit test (without RocksDB loaded) of the way I > use .cogroup shows exactly two ways that .cogroup calls an > implementation of AppendingState.add. I summarize those below. > > The two AppendingState subclasses invoked are HeapListState and > HeapReducingState. Neither have a support attribute on them, such as > MapState's @PublicEvolving. > > > I suggested updating the flink managed state using onTimer over an > > interval equal to the checkpoint interval. > > So the onTimer method, with interval set to the checkpoint interval. > Interesting. > > It looks like the closest subclass for my use case use would be either > KeyedCoProcessFunction. Let me see if I understand concretely the idea: > > 1) between checkpoints, read join input and write join output, by > loading any state reads from external state, but buffering all state > changes in mem
Re: Trouble with large state
Bhaskar, > Glad to know some progress. Yeah, some progress. Yet overnight run didn't look as good as I hoped. The throttling required to not crash during snapshots seems to be quite different from the throttling required to crash not during snapshots. So the lowest common denominator is quite a large performance penalty. What's worse, the rate of input that makes the snapshot performance go from good to bad seems to change significantly as the state size grows. Here is checkpoint history from an overnight run. Parameters: - 30 minutes minimum between snapshots - incremental snapshot mode - inputs throttled to 100 events per sec per input per slot, which is around 1/4 of the unthrottled throughput Checkpoint history: ID Status Acknowledged Trigger Time Latest Acknowledgement End to End Duration State Size Buffered During Alignment 12 COMPLETED 304/304 8:52:22 10:37:181h 44m 55s 60.5 GB 0 B 11 COMPLETED 304/304 6:47:03 8:22:19 1h 35m 16s 53.3 GB 0 B 10 COMPLETED 304/304 5:01:20 6:17:00 1h 15m 39s 41.0 GB 0 B 9 COMPLETED 304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B 8 COMPLETED 304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B 7 COMPLETED 304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B 6 COMPLETED 304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B 5 COMPLETED 304/304 0:23:27 0:28:01 4m 33s 14.3 GB 0 B 4 COMPLETED 304/304 23:52:2923:53:2656s 12.7 GB 0 B 3 COMPLETED 304/304 23:20:5923:22:281m 29s 10.8 GB 0 B 2 COMPLETED 304/304 22:46:1722:50:584m 40s 7.40 GB 0 B As you can see, GB/minute varies drastically. GB/minute also varies drastically with full checkpoint mode. I'm pleased that it hasn't crashed yet. Yet I'm concerned that with the checkpoint GB/minute getting so slow, it will crash soon. I'm really wishing state.backend.async=false worked for RocksDbStateBackend. I'm also wondering if my throttler would improve if I just connected to the REST api to ask if any checkpoint is in progress, and then paused inputs accordingly. Effectively state.backend.async=false via hacked application code. > Where are you updating your state here? I > couldn't find any flink managed state here. The only updates to state I make are through the built-in DataStream.cogroup. A unit test (without RocksDB loaded) of the way I use .cogroup shows exactly two ways that .cogroup calls an implementation of AppendingState.add. I summarize those below. The two AppendingState subclasses invoked are HeapListState and HeapReducingState. Neither have a support attribute on them, such as MapState's @PublicEvolving. > I suggested updating the flink managed state using onTimer over an > interval equal to the checkpoint interval. So the onTimer method, with interval set to the checkpoint interval. Interesting. It looks like the closest subclass for my use case use would be either KeyedCoProcessFunction. Let me see if I understand concretely the idea: 1) between checkpoints, read join input and write join output, by loading any state reads from external state, but buffering all state changes in memory in some kind of data structure. 2) whenever a checkpoint arrived or the memory consumed by buffered writes gets too big, flush the writes to state. Is that the gist of the idea about .onTimer? Jeff There are two paths from .coGroup to AppendingState.add path 1 of 2: .coGroup to HeapListState add:90, HeapListState {org.apache.flink.runtime.state.heap} processElement:203, EvictingWindowOperator {org.apache.flink.streaming.runtime.operators.windowing} processElement:164, StreamOneInputProcessor {org.apache.flink.streaming.runtime.io} processInput:143, StreamOneInputProcessor {org.apache.flink.streaming.runtime.io} org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement (windowAssigner is an instance of GlobalWindows) @Override public void processElement(StreamRecord element) throws Exception { final Collection elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { . . . } else { for (W window : elementWindows) { // check if the window is already inactive
Re: Difference between flink on kubernetes operator vs native kubernetes
Inline comments for your questions. Is it similar to the first? How the second is different? The first leverages the K8s operator to make standalone job cluster running on K8s easier. However, the second is more Flink native. We have an embedded K8s client in Flink and could use "kubernetes-session.sh"/"flink" to start session/job clusters. The biggest difference is dynamic resource allocation. All the TaskManager pods will be allocated/released dynamically. How can we handle multi-tenancy in the second? The job cluster(new named with "application" mode) will be supported in 1.11[1]. It could provide better isolation. Each application could be submitted with specified ServiceAccount, namespace, resources. And since the second is still in the beta stage, what is the roadmap or the > features I can expect from the second? Native Flink K8s integration is introduced in 1.10 and enriched with many features in 1.11. After 1.11, the interfaces( including command line, config options, etc.) will not change easily. And i think it could used in production for some cases. However, we still have some advanced features to complete(e.g. native K8s HA, pod template, volume support, etc.). And i hope they could be done in the next major release cycle(1.12). Feel free to share your feedback. BTW, we also have a detailed introduction "Integrate Flink with Kubernetes natively" in last Flink Forward. You could find the video[2] here. [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#flink-kubernetes-application [2]. https://youtu.be/pdFPr_VOWTU Best, Yang SAMPAD SAHA 于2020年6月20日周六 上午10:43写道: > I was trying to deploy Flink in Kubernetes environment and came across two > things: > > 1. Kubernetes Flink control plane developed by google and Lyft > - https://github.com/lyft/flinkk8soperator > - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator > > 2. Deploying Kubernetes natively. > - > https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html > > I would like to know the basic difference between the underlying working > model of both. > > I am able to understand the working structure of the first as well as the > scope and features. > But for the second, > -Is it similar to the first? How the second is different? > -How can we handle multi-tenancy in the second? > -And since the second is still in the beta stage, what is the > roadmap or the features I can expect > from the second? >