Thanks for the follow up Juha, I've just assigned FLINK-19238 to you. Let's
further track this on JIRA.

Best Regards,
Yu


On Tue, 15 Sep 2020 at 15:04, Juha Mynttinen <juha.myntti...@king.com>
wrote:

> Hey
>
> I created this one https://issues.apache.org/jira/browse/FLINK-19238.
>
> Regards,
> Juha
> ------------------------------
> *From:* Yun Tang <myas...@live.com>
> *Sent:* Tuesday, September 15, 2020 8:06 AM
> *To:* Juha Mynttinen <juha.myntti...@king.com>; Stephan Ewen <
> se...@apache.org>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hi Juha
>
> Would you please consider to contribute this back to community? If agreed,
> please open a JIRA ticket and we could help review your PR then.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Juha Mynttinen <juha.myntti...@king.com>
> *Sent:* Thursday, September 10, 2020 19:05
> *To:* Stephan Ewen <se...@apache.org>
> *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey
>
> I've fixed the code 
> (https://github.com/juha-mynttinen-king/flink/commits/arena_block_sanity_check
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_juha-2Dmynttinen-2Dking_flink_commits_arena-5Fblock-5Fsanity-5Fcheck&d=DwMF-g&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=hTf9FuebMj0aLzV_UjCRbhNFqYu9xv-z-Prn7VzN3mY&s=A12CH1PvP6wSCYufQeyIDbZlQI6LluLvQslQc2dMrZk&e=>)
> slightly. Now it WARNs if there is the memory configuration issue. Also, I
> think there was a bug in the way the check calculated the mutable memory,
> fixed that. Also, wrote some tests.
>
> I tried the code and in my setup I get a bunch of WARN if the memory
> configuration issue is happening:
>
> 20200910T140320.516+0300  WARN RocksDBStateBackend performance will be
> poor because of the current Flink memory configuration! RocksDB will flush
> memtable constantly, causing high IO and CPU. Typically the easiest fix is
> to increase task manager managed memory size. If running locally, see the
> parameter taskmanager.memory.managed.size. Details: arenaBlockSize 8388608
> < mutableLimit 7829367 (writeBufferSize 67108864 arenaBlockSizeConfigured 0
> defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8947848)
>  
> [org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.sanityCheckArenaBlockSize()
> @ 189]
>
> Regards,
> Juha
>
> ------------------------------
> *From:* Stephan Ewen <se...@apache.org>
> *Sent:* Wednesday, September 9, 2020 1:56 PM
> *To:* Juha Mynttinen <juha.myntti...@king.com>
> *Cc:* Yun Tang <myas...@live.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey Juha!
>
> I agree that we cannot reasonably expect from the majority of users to
> understand block sizes, area sizes, etc to get their application running.
> So the default should be "inform when there is a problem and suggest to
> use more memory." Block/arena size tuning is for the absolute expertes, the
> 5% super power users.
>
> The managed memory is 128 MB by default in the mini cluster. In a
> standalone session cluster setup with default config, it is 512 MB.
>
> Best,
> Stephan
>
>
>
> On Wed, Sep 9, 2020 at 11:10 AM Juha Mynttinen <juha.myntti...@king.com>
> wrote:
>
> Hey Yun,
>
> About the docs. I saw in the docs 
> (https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html
> [ci.apache.org]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dstable_ops_state_large-5Fstate-5Ftuning.html&d=DwMFaQ&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=61BtxMX6UCHk2TX2mluIR7QceE2iUPJGiu7Tzgt8zi8&s=WLTgjNYrq8bVj4LEDQSaJfqBYUymaBBn1rRF8UE8Dsc&e=>)
> this:
>
> "An advanced option (expert mode) to reduce the number of MemTable flushes
> in setups with many states, is to tune RocksDB’s ColumnFamily options
> (arena block size, max background flush threads, etc.) via a
> RocksDBOptionsFactory".
>
> Only after debugging this issue we're talking about, I figured that this
> snippet in the docs is probably talking about the issue I'm witnessing. I
> think there are two issues here:
>
> 1) it's hard/impossible to know what kind of performance one can expect
> from a Flink application. Thus, it's hard to know if one is suffering from
> e.g. from this performance issue, or if the system is performing normally
> (and inherently being slow).
> 2) even if one suspects a performance issue, it's very hard to find the
> root cause of the performance issue (memtable flush happening frequently).
> To find out this one would need to know what's the normal flush frequency.
>
> Also the doc says "in setups with many states". The same problem is hit
> when using just one state, but "high" parallelism (5).
>
> If the arena block size _ever_ needs  to be configured only to "fix" this
> issue, it'd be best if there _never_ was a need to modify arena block size. 
> What
> if we forget even mentioning arena block size in the docs and focus on the
> managed memory size, since managed memory size is something the user does
> tune.
>
> You're right that a very clear WARN message could also help to cope with
> the issue. What if there was a WARN message saying that performance will be
> poor and you should increase the managed memory size? And get rid of that
> arena block size decreasing example in the docs.
>
> Also, the default managed memory size is AFAIK 128MB right now. That could
> be increased. That would get rid of this issue in many cases.
>
> Regards,
> Juha
>
> ------------------------------
> *From:* Yun Tang <myas...@live.com>
> *Sent:* Tuesday, September 8, 2020 8:05 PM
> *To:* Juha Mynttinen <juha.myntti...@king.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hi Juha
>
> I planned to give some descriptions in Flink documentation to give such
> hints, however, it has too many details for RocksDB and we could increase
> the managed memory size to a proper value to avoid this in most cases.
> Since you have come across this and reported in user mailing list, and I
> think it's worth to give some hints in Flink documentations.
>
> When talking about your idea to sanity check the arena size, I think a
> warning should be enough as Flink seems never throw exception directly when
> the performance could be poor.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Juha Mynttinen <juha.myntti...@king.com>
> *Sent:* Tuesday, September 8, 2020 20:56
> *To:* Yun Tang <myas...@live.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hey Yun,
>
> Thanks for the detailed answer. It clarified how things work. Especially
> what is the role of RocksDB arena, and arena block size.
>
> I think there's no real-world case where it would make sense to start to a
> Flink job with RocksDB configured so that RocksDB flushes all the time,
> i.e. where the check "mutable_memtable_memory_usage() > mutable_limit_"
> is always true. The performance is just very poor and by using the same
> amount of RAM but just configuring RocksDB differently, performance could
> be e.g. 100 times better.
>
> It's very easy to hit this issue e.g. by just running a RocksDB-based
> Flink app using RocksDB with either slightly higher parallelism or with
> multiple operators. But finding out what and where the problem is very
> hard, e.g. because the issue is happening in native code and won't be
> visible even using a Java profiler.
>
> I wanted to see if it was possible to check the sanity of the arena block
> size and just make the app crash if the arena block size is too high (or
> the mutable limit too low). I came up with this 
> https://github.com/juha-mynttinen-king/flink/tree/arena_block_sanity_check
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_juha-2Dmynttinen-2Dking_flink_tree_arena-5Fblock-5Fsanity-5Fcheck&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=KeJGah-zF_IKVwAN9Wz50XduWWt3gQtTI0EucGoOgTw&s=lqc16JFtbr3jKDpvzdJF0BiUrrTAEYtNF_bqM9Wl1Vs&e=>.
> The code calculates the same parameters that are calculated in RocksDB and
> throws if the arena block size is higher than the "mutable limit".
>
> I did a few quick tests and the code seems to work, with small parallelism
> my app works, but with higher parallelism (when the app would flush all the
> time), it crashes with message like "arenaBlockSize 8388608 < mutableLimit
> 7340032 (writeBufferSize 67108864 arenaBlockSizeConfigured 0
> defaultArenaBlockSize 8388608 writeBufferManagerCapacity 8388608). RocksDB
> would flush memtable constantly. Refusing to start. You can 1) make arena
> block size smaller, 2) decrease parallelism (if possible), 3) increase
> managed memory"
>
> Regards,
> Juha
>
> ------------------------------
> *From:* Yun Tang <myas...@live.com>
> *Sent:* Friday, August 28, 2020 6:58 AM
> *To:* Juha Mynttinen <juha.myntti...@king.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> Hi Juha
>
> Thanks for your enthusiasm to dig this problem and sorry for jumping in
> late for this thread to share something about write buffer manager in
> RocksDB.
>
> First of all, the reason why you meet the poor performance is due to
> writer buffer manager has been assigned a much lower limit (due to poor
> managed memory size on that slot) than actual needed. The competition of
> allocating memory between different column families lead RocksDB to switch
> active memtable to immutable memtable in advance, which leads to the poor
> performance as this increase the write amplification.
>
> To keep the memory not exceed the limit, write buffer manager would decide
> whether to flush the memtable in advance, which is the statement you found: 
> mutable_memtable_memory_usage()
> > mutable_limit_ [1] and the memory usage includes allocated but not even
> used arean_block.
> When talking about the arena, memory allocator in RocksDB, I need to
> correct one thing in your thread: the block cache would not allocate any
> memory, all memory is allocated from arena.
>
> The core idea of RocksDB how to limit memory usage: arena allocates
> memory, write buffer manager decide when to switch memtable to control the
> active memory usage, and write buffer manager also accounts its allocated
> memory into the cache. The underlying block cache evict memory with
> accounting from write buffer manager and the cached block, filter & index.
>
> By default, arena_block_size is not configured, and it would be 1/8 of
> write buffer size [2]. And the default write buffer size is 64MB, that's
> why you could find "Options.arena_block_size: 8388608" in your logs.
> As you can see, RocksDB think it could use 64MB write buffer by default.
> However, Flink needs to control the total memory usage and has to configure
> write buffer manager based on the managed memory. From your logs "Write
> buffer is using 16789472 bytes out of a total of 17895697", I believe the
> managed memory of that slot (managed memory size / num of slots in one TM)
> is quite poor. If we have 1 slot with 1GB for task manager, the managed
> memory should be near 300MB which is fine for default RocksDB
> configuration. However, you just have about 90MB for the managed memory
> over that slot. When you enable managed memory on RocksDB, it would try its
> best to limit the total memory of all rocksDB instances within one slot
> under 90MB. Once you disable the managed memory control over rocksDB, each
> RocksDB instance could use about 64*2+8=136MB, since you have two operators
> here, they could use more than 200MB+ in one slot.
>
> There existed several solutions to mitigate this regression:
>
>    1. Increase the overall managed memory size for one slot.
>    2. Increase the write buffer ratio
>    3. Set the arean_block_size explicitly instead of default 8MB to avoid
>    unwanted flush in advance:
>
>   e.g:   new ColumnFamilyOptions().setArenaBlockSize(2 * 1024 * 1024L);
>
>
> [1] 
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_dataArtisans_frocksdb_blob_49bc897d5d768026f1eb816d960c1f2383396ef4_include_rocksdb_write-5Fbuffer-5Fmanager.h-23L47&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=Xly6aYk9rvQu-c5yGlirem4FcuzQItD7dLJP-mROsVE&e=>
> [2] 
> https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_dataArtisans_frocksdb_blob_49bc897d5d768026f1eb816d960c1f2383396ef4_db_column-5Ffamily.cc-23L196&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=VQyThuy-5sP16APcviNgewjYr0fd43yZdxkyNw90Zzg&e=>
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Juha Mynttinen <juha.myntti...@king.com>
> *Sent:* Monday, August 24, 2020 15:56
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Performance issue associated with managed RocksDB memory
>
> The issue can be reproduced by using a certain combinations of the value of
> RocksDBOptions.WRITE_BUFFER_RATIO (default 0.5) and the Flink job
> parallelism.
>
> Examples that break:
> * Parallelism 1 and WRITE_BUFFER_RATIO 0.1
> * Parallelism 5 and the default WRITE_BUFFER_RATIO 0.5
>
> Examples that work:
> * Parallelism 1 and WRITE_BUFFER_RATIO 0.5, duration 34164 ms
>
> In a working case (parallelism 1, WRITE_BUFFER_RATIO 0.2) RocksDB log looks
> like this (right after the uninteresting bootup messages):
>
> 2020/08/21-12:55:41.693771 7f56e6643700 [db/db_impl.cc:1546] Created column
> family [valueState] (ID 1)
> 2020/08/21-12:55:42.213743 7f56e6643700 [db/db_impl_write.cc:1103] Flushing
> column family with largest mem table size. Write buffer is using 16789472
> bytes out of a total of 17895697.
> 2020/08/21-12:55:42.213799 7f56e6643700 [db/db_impl_write.cc:1423]
> [valueState] New memtable created with log file: #3. Immutable memtables:
> 0.
> 2020/08/21-12:55:42.213924 7f56deffd700 (Original Log Time
> 2020/08/21-12:55:42.213882) [db/db_impl_compaction_flush.cc:1560] Calling
> FlushMemTableToOutputFile with column family [valueState], flush slots
> available 1, compaction slots available 1, flush slots scheduled 1,
> compaction slots scheduled 0
> 2020/08/21-12:55:42.213927 7f56deffd700 [db/flush_job.cc:304] [valueState]
> [JOB 2] Flushing memtable with next log file: 3
> 2020/08/21-12:55:42.213969 7f56deffd700 EVENT_LOG_v1 {"time_micros":
> 1598003742213958, "job": 2, "event": "flush_started", "num_memtables": 1,
> "num_entries": 170995, "num_deletes": 0, "memory_usage": 8399008,
> "flush_reason": "Write Buffer Full"}
> 2020/08/21-12:55:42.213973 7f56deffd700 [db/flush_job.cc:334] [valueState]
> [JOB 2] Level-0 flush table #9: started
> 2020/08/21-12:55:42.228444 7f56deffd700 EVENT_LOG_v1 {"time_micros":
> 1598003742228435, "cf_name": "valueState", "job": 2, "event":
> "table_file_creation", "file_number": 9, "file_size": 10971,
> "table_properties": {"data_size": 10200, "index_size": 168, "filter_size":
> 0, "raw_key_size": 18000, "raw_average_key_size": 18, "raw_value_size":
> 8000, "raw_average_value_size": 8, "num_data_blocks": 6, "num_entries":
> 1000, "filter_policy_name": "", "kDeletedKeys": "0", "kMergeOperands":
> "0"}}
> 2020/08/21-12:55:42.228460 7f56deffd700 [db/flush_job.cc:374] [valueState]
> [JOB 2] Level-0 flush table #9: 10971 bytes OK
>
> The main thing to look at is "num_entries": 170995, meaning RocksDB flushes
> a memtable with quite large number of entries. It flushes 53 times during
> the test, which sounds sensible.
>
> In a breaking case (parallelism 1, WRITE_BUFFER_RATIO 0.1) RocksDB log
> looks
> like this:
>
> 2020/08/21-12:53:02.917606 7f2cabfff700 [db/db_impl_write.cc:1103] Flushing
> column family with largest mem table size. Write buffer is using 8396784
> bytes out of a total of 8947848.
> 2020/08/21-12:53:02.917702 7f2cabfff700 [db/db_impl_write.cc:1423]
> [valueState] New memtable created with log file: #3. Immutable memtables:
> 0.
> 2020/08/21-12:53:02.917988 7f2ca8bf1700 (Original Log Time
> 2020/08/21-12:53:02.917868) [db/db_impl_compaction_flush.cc:1560] Calling
> FlushMemTableToOutputFile with column family [valueState], flush slots
> available 1, compaction slots available 1, flush slots scheduled 1,
> compaction slots scheduled 0
> 2020/08/21-12:53:02.918004 7f2ca8bf1700 [db/flush_job.cc:304] [valueState]
> [JOB 2] Flushing memtable with next log file: 3
> 2020/08/21-12:53:02.918099 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros":
> 1598003582918053, "job": 2, "event": "flush_started", "num_memtables": 1,
> "num_entries": 29, "num_deletes": 0, "memory_usage": 6320, "flush_reason":
> "Write Buffer Full"}
> 2020/08/21-12:53:02.918118 7f2ca8bf1700 [db/flush_job.cc:334] [valueState]
> [JOB 2] Level-0 flush table #9: started
> ...
> 2020/08/21-12:55:20.261887 7f2ca8bf1700 EVENT_LOG_v1 {"time_micros":
> 1598003720261879, "job": 20079, "event": "flush_started", "num_memtables":
> 1, "num_entries": 29, "num_deletes": 0, "memory_usage": 2240,
> "flush_reason": "Write Buffer Full"}
> 2020/08/21-12:55:20.261892 7f2ca8bf1700 [db/flush_job.cc:334] [valueState]
> [JOB 20079] Level-0 flush table #20085: started
>
> This time "num_entries": 29, meaning RocksDB flushes the memtable when
> there
> are only 29 entries consuming 6320 bytes memory. All memtable flushes look
> alike. There are total flushes 20079 times during the test, which is more
> than 300 times more than with the working config. Memtable flush and the
> compactions those will cause kill the performance.
>
> It looks like RocksDB flushes way too early, before the memtable should be
> considered full. But why? The answer lies in the RocksDB code.
>
> kingspace/frocksdb/db/db_impl_write.cc
>   if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
>     // Before a new memtable is added in SwitchMemtable(),
>     // write_buffer_manager_->ShouldFlush() will keep returning true. If
> another
>     // thread is writing to another DB with the same write buffer, they may
> also
>     // be flushed. We may end up with flushing much more DBs than needed.
> It's
>     // suboptimal but still correct.
>     status = HandleWriteBufferFull(write_context);
>   }
>
> ...
> Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
>   mutex_.AssertHeld();
>   assert(write_context != nullptr);
>   Status status;
>
>   // Before a new memtable is added in SwitchMemtable(),
>   // write_buffer_manager_->ShouldFlush() will keep returning true. If
> another
>   // thread is writing to another DB with the same write buffer, they may
> also
>   // be flushed. We may end up with flushing much more DBs than needed.
> It's
>   // suboptimal but still correct.
>   ROCKS_LOG_INFO(
>       immutable_db_options_.info_log,
>       "Flushing column family with largest mem table size. Write buffer is
> "
>       "using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
>       write_buffer_manager_->memory_usage(),
>       write_buffer_manager_->buffer_size());
>
>
> frocksdb/include/rocksdb/write_buffer_manager.h:
>
>   bool ShouldFlush() const {
>     if (enabled()) {
>       if (mutable_memtable_memory_usage() > mutable_limit_) {
>         return true;
>       }
>       if (memory_usage() >= buffer_size_ &&
>           mutable_memtable_memory_usage() >= buffer_size_ / 2) {
>         // If the memory exceeds the buffer size, we trigger more
> aggressive
>         // flush. But if already more than half memory is being flushed,
>         // triggering more flush may not help. We will hold it instead.
>         return true;
>       }
>     }
>     return false;
>   }
>
> Let's dig some params. There's the line in the logs "Flushing column family
> with largest mem table size. Write buffer is using 8396784 bytes out of a
> total of 8947848.". From that we can see:
>
> write_buffer_manager_->memory_usage() is 8396784
> write_buffer_manager_->buffer_size() is 8947848
>
> Additionally:
>
> buffer_size_ is 8947848. This can be seen e.g. putting a breakpoint in
> RocksDBMemoryControllerUtils.createWriteBufferManager()
> mutable_limit_ is buffer_size_ * 7 / 8 = 8947848 * 7 / 8 = 7829367
>
> In ShouldFlush() "memory_usage() >= buffer_size_" is false, so the latter
> if-statement in ShouldFlush() is false. The 1st one has to be true. I'm not
> totally sure why this happens.
>
> Now I'm guessing. The memory RocksDB uses for the block cache is calculated
> in the memory memtable uses (in mutable_memtable_memory_usage()).
>
> In RocksDB conf:
>
> Options.arena_block_size: 8388608
>
> If the block cache has allocated one of these blocks, this check:
>
> mutable_memtable_memory_usage() > mutable_limit_
>
> Becomes:
>
> 8388608 + really_used_by_memtable > 7829367
> 8388608 + 6320 > 7829367
>
> This is always true (even if memtable used 0 bytes of memory). ShouldFlush
> always returns true. This makes RocksDB constantly flush.
>
> Even if I didn't correctly understand the code, somehow the flushing
> happens
> constantly.
>
> The RocksDB docs https://github.com/facebook/rocksdb/wiki/MemTable#flush
> [github.com]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_facebook_rocksdb_wiki_MemTable-23flush&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=zayCxl8PK6XCl4IQfMmjHY_RUc1_-429d8xpvdwn5rE&e=>
> say
> memtable is flushed when "write_buffer_manager signals a flush". It seems
> that write buffer manager signaling to flush is happening here, but should
> it really? It feels odd (if it really is so) that block cache size affects
> the decision when the flush the memtable.
>
>
> Here's the latest test program. I've tested against Flink 1.11.1.
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *    http://www.apache.org/licenses/LICENSE-2.0 [apache.org]
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.apache.org_licenses_LICENSE-2D2.0&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=yegrE6BuvXIACM2U8ntJc4oJ7mo3t7McnNc4jsBVmoc&e=>
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.flink.streaming.examples.wordcount;
>
> import com.google.common.util.concurrent.RateLimiter;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.state.ListState;
> import org.apache.flink.api.common.state.ListStateDescriptor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.utils.MultipleParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.state.RocksDBOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.FunctionInitializationContext;
> import org.apache.flink.runtime.state.FunctionSnapshotContext;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.util.Collector;
> import org.rocksdb.ColumnFamilyOptions;
> import org.rocksdb.DBOptions;
> import org.rocksdb.InfoLogLevel;
>
> import java.nio.file.Files;
> import java.nio.file.Path;
> import java.util.Collection;
>
> import static
>
> org.apache.flink.contrib.streaming.state.PredefinedOptions.FLASH_SSD_OPTIMIZED;
>
> /**
>  * Works fast in the following cases.
>  * <ul>
>  *     <li>{@link #USE_MANAGED_MEMORY} is {@code false}</li>
>  *     <li>{@link #USE_MANAGED_MEMORY} is {@code true} and {@link
> #PARALLELISM} is 1 to 4.</li>
>  * </ul>
>  * <p>
>  * Some results:
>  * </p>
>  * <ul>
>  *     <li>USE_MANAGED_MEMORY false parallelism 3: 3088 ms</li>
>  *     <li>USE_MANAGED_MEMORY false parallelism 4: 2971 ms</li>
>  *     <li>USE_MANAGED_MEMORY false parallelism 5: 2994 ms</li>
>  *     <li>USE_MANAGED_MEMORY true parallelism 3: 4337 ms</li>
>  *     <li>USE_MANAGED_MEMORY true parallelism 4: 2808 ms</li>
>  *     <li>USE_MANAGED_MEMORY true parallelism 5: 126050 ms</li>
>  * </ul>
>  * <p>
>  */
> public class WordCount {
>     /**
>      * The parallelism of the job.
>      */
>     private static final int PARALLELISM = 1;
>
>     /**
>      * Whether to use managed memory. True, no changes in the config.
>      * False, managed memory is disabled.
>      */
>     private static final boolean USE_MANAGED_MEMORY = true;
>
>     /**
>      * If {@link #USE_MANAGED_MEMORY} is {@code true} has effect.
>      * Sets the {@link RocksDBOptions#WRITE_BUFFER_RATIO}.
>      */
>     private static Double WRITE_BUFFER_RATIO = 0.1;
>
>     /**
>      * The source synthesizes this many events.
>      */
>     public static final int EVENT_COUNT = 1_000_000;
>
>     /**
>      * The value of each event is {@code EVENT_COUNT % MAX_VALUE}.
>      * Essentially controls the count of unique keys.
>      */
>     public static final int MAX_VALUE = 1_000;
>
>     /**
>      * If non-null, rate limits the events from the source.
>      */
>     public static final Integer SOURCE_EVENTS_PER_SECOND = null;
>
>     public static final boolean ENABLE_ROCKS_LOGGING = true;
>
>
>     //
> *************************************************************************
>     // PROGRAMF
>     //
> *************************************************************************
>
>     public static void main(String[] args) throws Exception {
>
>         // Checking input parameters
>         final MultipleParameterTool params =
> MultipleParameterTool.fromArgs(args);
>
>         // set up the execution environment
>         Configuration configuration = new Configuration();
>         if (!USE_MANAGED_MEMORY) {
>             configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY,
> USE_MANAGED_MEMORY);
>         } else {
>             if (WRITE_BUFFER_RATIO != null) {
>                 configuration.setDouble(RocksDBOptions.WRITE_BUFFER_RATIO,
> WRITE_BUFFER_RATIO.doubleValue());
>             }
>         }
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(PARALLELISM,
> configuration);
>
>         Path tempDirPath = Files.createTempDirectory("example");
>         String checkpointDataUri = "file://%22 + tempDirPath.toString();
>
>         RocksDBStateBackend rocksDBStateBackend = new
> RocksDBStateBackend(checkpointDataUri, true);
>         if (ENABLE_ROCKS_LOGGING) {
>             rocksDBStateBackend.setRocksDBOptions(new
> ExampleRocksDBOptionsFactory());
>         } else {
>             rocksDBStateBackend.setPredefinedOptions(FLASH_SSD_OPTIMIZED);
>         }
>
>         env.setStateBackend((StateBackend) rocksDBStateBackend);
>
>         // make parameters available in the web interface
>         env.getConfig().setGlobalJobParameters(params);
>
>         // get input data
>         DataStream<Long> text = env.addSource(new ExampleCountSource());
>
>         text.keyBy(v -> v)
>                 .flatMap(new ValuesCounter())
>                 .addSink(new DiscardingSink<>());
>
>         long before = System.currentTimeMillis();
>         env.execute("Streaming WordCount");
>         long duration = System.currentTimeMillis() - before;
>
>         System.out.println("Done " + duration + " ms, parallelism " +
> PARALLELISM);
>     }
>
>
>     private static class ExampleRocksDBOptionsFactory implements
> RocksDBOptionsFactory {
>
>         @Override
>         public DBOptions createDBOptions(DBOptions currentOptions,
> Collection<AutoCloseable> handlesToClose) {
>             currentOptions.setIncreaseParallelism(4)
>                     .setUseFsync(false)
>                     .setMaxOpenFiles(-1)
>                     .setKeepLogFileNum(10)
>                     .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
>                     .setStatsDumpPeriodSec(0)
>                     .setMaxLogFileSize(100 * 1024 * 1024); // 100 MB each
>
>             return currentOptions;
>         }
>
>         @Override
>         public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions
> currentOptions, Collection<AutoCloseable> handlesToClose) {
>             return currentOptions;
>         }
>     }
>
>     //
> *************************************************************************
>     // USER FUNCTIONS
>     //
> *************************************************************************
>
>     private static class ValuesCounter extends RichFlatMapFunction<Long,
> Tuple2&lt;Long, Long>> {
>         private ValueState<Long> state;
>
>
>         @Override
>         public void flatMap(Long value, Collector<Tuple2&lt;Long, Long>>
> out) throws Exception {
>             Long oldCount = state.value();
>             if (oldCount == null) {
>                 oldCount = 0L;
>             }
>             long newCount = oldCount + 1;
>             state.update(newCount);
>
>             out.collect(Tuple2.of(value, newCount));
>         }
>
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             super.open(parameters);
>
>             ValueStateDescriptor<Long> descriptor = new
> ValueStateDescriptor("valueState", BasicTypeInfo.LONG_TYPE_INFO);
>             state = getRuntimeContext().getState(descriptor);
>         }
>     }
>
>     public static class ExampleCountSource implements SourceFunction<Long>,
> CheckpointedFunction {
>         private long count = 0L;
>         private volatile boolean isRunning = true;
>
>         private transient ListState<Long> checkpointedCount;
>
>         private static final RateLimiter rateLimiter =
> SOURCE_EVENTS_PER_SECOND != null ?
> RateLimiter.create(SOURCE_EVENTS_PER_SECOND) : null;
>
>         public void run(SourceContext<Long> ctx) {
>             while (isRunning && count < EVENT_COUNT) {
>                 if (rateLimiter != null) {
>                     rateLimiter.acquire();
>                 }
>                 // this synchronized block ensures that state
> checkpointing,
>                 // internal state updates and emission of elements are an
> atomic operation
>                 synchronized (ctx.getCheckpointLock()) {
>                     ctx.collect(count % MAX_VALUE);
>                     count++;
>                 }
>             }
>         }
>
>         public void cancel() {
>             isRunning = false;
>         }
>
>         public void initializeState(FunctionInitializationContext context)
> throws Exception {
>             this.checkpointedCount = context
>                     .getOperatorStateStore()
>                     .getListState(new ListStateDescriptor<>("count",
> Long.class));
>
>             if (context.isRestored()) {
>                 for (Long count : this.checkpointedCount.get()) {
>                     this.count = count;
>                 }
>             }
>         }
>
>         public void snapshotState(FunctionSnapshotContext context) throws
> Exception {
>             this.checkpointedCount.clear();
>             this.checkpointedCount.add(count);
>         }
>     }
> }
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> [apache-flink-user-mailing-list-archive.2336050.n4.nabble.com]
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwMFAg&c=-0jfte1J3SKEE6FyZmTngg&r=-2x4lRPm2yEX3Ylri2jKFRC6zr9S6Iqg2kAJIspWwfA&m=9dqFsA-w9rEcr782SVR8quiS2bKsubnmM8ZshIPBlNM&s=xdutsLFVzPqnjT5kR1y76hiY-68pJNMeMHT5S7DL_d8&e=>
>
>

Reply via email to