Re: MapState bad performance

2020-06-20 Thread Congxian Qiu
Hi Nick

Sorry for the late jump in.

Just wondering why you call putAll of RocksDBMapState and has
RocksDBMapState#clear() followed.  seems the state will always be empty
after the process.

Best,
Congxian


Yun Tang  于2020年6月16日周二 下午7:42写道:

> Hi Nick
>
> From my experience, it's not easy to tune this without code to reproduce.
> Could you please give code with fake source to reproduce so that we could
> help you?
>
> If CPU usage is 100% at rocksDB related methods, it's might be due to we
> access RocksDB too often . If the CPU usage is not 100% while disk util is
> 100%, it should be
> we meet the performance limit of disk.
>
> BTW, if you have 16GB memory TM with 32 slots, it would only give about
> 150MB managed memory [1][2] for RocksDB, which looks like a bit small.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_setup.html#managed-memory
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_tuning.html#rocksdb-state-backend
>
> Best
> Yun Tang
>
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 18:36
> *To:* Yun Tang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: MapState bad performance
>
> Hi,
>
> We are using flink version 1.10.1
> The task manager memory 16GB
> The number of slots is 32 but the job parallelism is 1.
> We used the default configuration for rocksdb.
> We checked the disk speed on the machine running the task manager: Write
> 300MB and read 1GB
>
> BR,
> Nick
>
> ‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ <‪myas...@live.com
> ‬‏>:‬
>
> Hi Nick
>
> As you might know, RocksDB suffers not so good performance for
> iterator-like operations due to it needs to merge sort for multi levels. [1]
>
> Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek
> operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator
> over state and remove entry [3].
> However, even these operations behaves not so good, I don't think they
> would behave extremely bad in general case. From our experience on SSD, the
> latency of seek should be less than 100us
> and could go up to hundreds of us, did you use SSD disk?
>
>1. What is the Flink version, taskmanager memory, number of slots and
>RocksDB related configurations?
>2. Have you checked the IOPS, disk util for those machines which
>containing task manager running RocksDB?
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
> [2]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
> [3]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254
>
> Best
> Yun Tang
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:35
> *To:* user@flink.apache.org 
> *Subject:* MapState bad performance
>
> Hello,
>
> We wrote a very simple streaming pipeline containing:
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState testMapState;
>
> @Override
> public void processElement(Map value, Context ctx, 
> Collector> out) throws Exception {
>
> if (testMapState.isEmpty()) {
>
> testMapState.putAll(value);
>
> out.collect(value);
>
> testMapState.clear();
> }
> }
>
> We faced very bad performance and then we made some tests using jprofiler.
> Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
> 1. isEmpty() - around 7 ms
> 2. clear() - around 4 ms
>
> We had to change and use ValueState instead.
>
> Are we using the MapState in the correct way or are we doing something
> wrong ?
> Is this behaviour expected because flink  recommendations are to use
> MapState and NOT ValueState ?
>
> BR,
> Nick
>
>


Re: Trouble with large state

2020-06-20 Thread Congxian Qiu
Hi

Sorry to jump in late.
After read the previous email. I have such assumptions, and please correct
me if I'm wrong:
- RocksDBStateBackend with incremental checkpoint
- at least once mode
- the parallelism for the stateful operator is 8
- checkpoint may take too long to complete
- has fix rate input by using throttler.

>From the latest email, the checkpoint size grows from start to end, and the
e2e time grows also.

>From my side.
e2e checkpoint time depends on the e2e snapshot time of all operators. the
e2e snapshot time of operator depends on the ${barrier_align_time} +
${sync-snapshot-time} + ${async-snapshot-time}.
For at least once mode, you can enable debug log to track the process of
barrier align time.

You can find out which step is the bottleneck, and track one task to find
out the reason.

maybe you could try:
1. use fullsnapshot of RocksDBStateBackend(disable incremental checkpoint)
and see what the e2e time of checkpoint will be -- This wants to verify
whether there is too many "increment change" between checkpoints.
2. place the times on Heap and RocksDB, whether this will affect the
checkpoint time -- The timer on heap will affect the sync-snapshot time
3. find out whether there is io/disk problem when snapshotting?
4. find out whether there is network problem when snapshotting?
5. does upload the state using multiple threads[1] help here


[1] https://issues.apache.org/jira/browse/FLINK-11008

Best,
Congxian


Jeff Henrikson  于2020年6月21日周日 上午2:46写道:

> Bhaskar,
>
>  > Glad to know some progress.
>
> Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
>
> The throttling required to not crash during snapshots seems to be quite
> different from the throttling required to crash not during snapshots.
> So the lowest common denominator is quite a large performance penalty.
>
> What's worse, the rate of input that makes the snapshot performance go
> from good to bad seems to change significantly as the state size grows.
> Here is checkpoint history from an overnight run.
>
> Parameters:
>
>  - 30 minutes minimum between snapshots
>  - incremental snapshot mode
>  - inputs throttled to 100 events per sec per input per slot,
>which is around 1/4 of the unthrottled throughput
>
> Checkpoint history:
>
> ID  Status  AcknowledgedTrigger TimeLatest
> Acknowledgement  End to End
> DurationState Size  Buffered During Alignment
> 12  COMPLETED   304/304 8:52:22 10:37:181h 44m
> 55s  60.5 GB 0 B
> 11  COMPLETED   304/304 6:47:03 8:22:19 1h 35m 16s
> 53.3 GB 0 B
> 10  COMPLETED   304/304 5:01:20 6:17:00 1h 15m 39s
> 41.0 GB 0 B
> 9   COMPLETED   304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B
> 8   COMPLETED   304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B
> 7   COMPLETED   304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B
> 6   COMPLETED   304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B
> 5   COMPLETED   304/304 0:23:27 0:28:01 4m 33s  14.3 GB 0 B
> 4   COMPLETED   304/304 23:52:2923:53:26
> 56s 12.7 GB 0 B
> 3   COMPLETED   304/304 23:20:5923:22:281m
> 29s  10.8 GB 0 B
> 2   COMPLETED   304/304 22:46:1722:50:584m
> 40s  7.40 GB 0 B
>
> As you can see, GB/minute varies drastically.  GB/minute also varies
> drastically with full checkpoint mode.
>
> I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the
> checkpoint GB/minute getting so slow, it will crash soon.
>
> I'm really wishing state.backend.async=false worked for
> RocksDbStateBackend.
>
> I'm also wondering if my throttler would improve if I just connected to
> the REST api to ask if any checkpoint is in progress, and then paused
> inputs accordingly.  Effectively state.backend.async=false via hacked
> application code.
>
>  > Where are you updating your state here? I
>  > couldn't find any flink managed state here.
>
> The only updates to state I make are through the built-in
> DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I
> use .cogroup shows exactly two ways that .cogroup calls an
> implementation of AppendingState.add.  I summarize those below.
>
> The two AppendingState subclasses invoked are HeapListState and
> HeapReducingState.  Neither have a support attribute on them, such as
> MapState's @PublicEvolving.
>
>  > I suggested updating the flink managed state using onTimer over an
>  > interval equal to the checkpoint interval.
>
> So the onTimer method, with interval set to the checkpoint interval.
> Interesting.
>
> It looks like the closest subclass for my use case use would be either
> KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
>
> 1) between checkpoints, read join input and write join output, by
> loading any state reads from external state, but buffering all state
> changes in mem

Re: Trouble with large state

2020-06-20 Thread Jeff Henrikson

Bhaskar,

> Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. 
So the lowest common denominator is quite a large performance penalty.


What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.


Parameters:

- 30 minutes minimum between snapshots
- incremental snapshot mode
- inputs throttled to 100 events per sec per input per slot,
  which is around 1/4 of the unthrottled throughput

Checkpoint history:

	ID	Status	Acknowledged	Trigger Time	Latest Acknowledgement	End to End 
Duration	State Size	Buffered During Alignment

12  COMPLETED   304/304 8:52:22 10:37:181h 44m 55s  
60.5 GB 0 B
11  COMPLETED   304/304 6:47:03 8:22:19 1h 35m 16s  53.3 GB 
0 B
10  COMPLETED   304/304 5:01:20 6:17:00 1h 15m 39s  41.0 GB 
0 B
9   COMPLETED   304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B
8   COMPLETED   304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B
7   COMPLETED   304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B
6   COMPLETED   304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B
5   COMPLETED   304/304 0:23:27 0:28:01 4m 33s  14.3 GB 0 B
4   COMPLETED   304/304 23:52:2923:53:2656s 
12.7 GB 0 B
3   COMPLETED   304/304 23:20:5923:22:281m 29s  
10.8 GB 0 B
2   COMPLETED   304/304 22:46:1722:50:584m 40s  
7.40 GB 0 B

As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.


I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.


I'm really wishing state.backend.async=false worked for RocksDbStateBackend.

I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.


> Where are you updating your state here? I
> couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.


The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.


> I suggested updating the flink managed state using onTimer over an
> interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.


It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:


1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.


2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.


Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

path 1 of 2: .coGroup to HeapListState

add:90, HeapListState {org.apache.flink.runtime.state.heap}
processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}



org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement

  (windowAssigner is an instance of GlobalWindows)

@Override
	public void processElement(StreamRecord element) 
throws Exception {
		final Collection elementWindows = 
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), 
windowAssignerContext);


//if element is handled by none of assigned 
elementWindows
boolean isSkippedElement = true;

final K key = 
this.getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {
. . .
} else {
for (W window : elementWindows) {

// check if the window is already 
inactive
  

Re: Difference between flink on kubernetes operator vs native kubernetes

2020-06-20 Thread Yang Wang
Inline comments for your questions.

Is it similar to the first? How the second is different?

The first leverages the K8s operator to make standalone job cluster running
on K8s easier. However,
the second is more Flink native. We have an embedded K8s client in Flink
and could use "kubernetes-session.sh"/"flink"
to start session/job clusters. The biggest difference is dynamic resource
allocation. All the TaskManager
pods will be allocated/released dynamically.

How can we handle multi-tenancy in the second?

The job cluster(new named with "application" mode) will be supported in
1.11[1]. It could provide better isolation. Each
application could be submitted with specified ServiceAccount, namespace,
resources.

And since the second is still in the beta stage, what is the roadmap or the
> features I can expect from the second?

Native Flink K8s integration is introduced in 1.10 and enriched with many
features in 1.11. After 1.11, the interfaces(
including command line, config options, etc.) will not change easily. And i
think it could used in production for some
cases. However, we still have some advanced features to complete(e.g.
native K8s HA, pod template, volume support, etc.).
And i hope they could be done in the next major release cycle(1.12).

Feel free to share your feedback. BTW, we also have a detailed introduction
"Integrate Flink with Kubernetes natively"
in last Flink Forward. You could find the video[2] here.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#flink-kubernetes-application

[2]. https://youtu.be/pdFPr_VOWTU


Best,
Yang

SAMPAD SAHA  于2020年6月20日周六 上午10:43写道:

> I was trying to deploy Flink in Kubernetes environment and came across two
> things:
>
> 1. Kubernetes Flink control plane developed by google and Lyft
>  - https://github.com/lyft/flinkk8soperator
>  - https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
>
> 2. Deploying Kubernetes natively.
> -
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html
>
> I would like to know the basic difference between the underlying working
> model of both.
>
> I am able to understand the working structure of the first as well as the
> scope and features.
> But for the second,
>  -Is it similar to the first? How the second is different?
>  -How can we handle multi-tenancy in the second?
>  -And since the second is still in the beta stage, what is the
> roadmap or the features I can expect
>   from the second?
>