Hi Bajaj

Current "state.checkpoints.dir" defines cluster-wide location for cluster and 
each job would create the specific checkpoint location under it with job-id 
sub-directory. It is the same for the checkpoint URL in RocksDB.

And the configuration option "state.backend.rocksdb.localdir" [1] should work 
for RocksDB in Flink-1.7.1.

[1] 
https://github.com/apache/flink/blob/808cc1a23abb25bd03d24d75537a1e7c6987eef7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L285-L301

Best
Yun Tang
________________________________
From: Bajaj, Abhinav <abhinav.ba...@here.com>
Sent: Tuesday, April 28, 2020 8:03
To: user@flink.apache.org <user@flink.apache.org>
Cc: Chesnay Schepler <ches...@apache.org>
Subject: Re: RocksDB default logging configuration


It seems requiring the checkpoint URL to create the RocksDBStateBackend mixes 
up the operational aspects of cluster within the job.

RocksDBStateBackend stateBackend = new RocksDBStateBackend(“CHECKPOINT_URL”, 
true);
stateBackend.setDbStoragePath(“DB_STORAGE_PATH”);



Also, noticed that the RocksDBStateBackend picks up the savepoint dir from 
property “state.savepoints.dir” of the flink-conf.yaml file but does not pick 
up the “state.backend.rocksdb.localdir”.

So I had to set from the job as above.



I feel there is a disconnect and would like to get confirmation of the above 
behavior, if possible.

I am using Flink 1.7.1.



Thanks Chesnay for your response below.



~ Abhinav Bajaj



From: Chesnay Schepler <ches...@apache.org>
Date: Wednesday, April 22, 2020 at 11:17 PM
To: "Bajaj, Abhinav" <abhinav.ba...@here.com>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: RocksDB default logging configuration



CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.



AFAIK this is not possible; the client doesn't know anything about the cluster 
configuration.



FLINK-15747 proposes to add an additional config option for controlling the 
logging behavior.



The only workaround I can think of would be to create a custom Flink 
distribution with a modified RocksDBStateBackend which always sets these 
options by default.





On 23/04/2020 03:24, Bajaj, Abhinav wrote:

Bumping this one again to catch some attention.



From: "Bajaj, Abhinav" <abhinav.ba...@here.com><mailto:abhinav.ba...@here.com>
Date: Monday, April 20, 2020 at 3:23 PM
To: "user@flink.apache.org"<mailto:user@flink.apache.org> 
<user@flink.apache.org><mailto:user@flink.apache.org>
Subject: RocksDB default logging configuration



Hi,



Some of our teams ran into the disk space issues because of RocksDB default 
logging configuration - 
FLINK-15068<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15068&data=01%7C01%7C%7C65ecdeaef08b4db3b70208d7e74e0991%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=AQYtpXMPmcyKpYnL4GoHJUI8s59h8Z2C4qU0wIgN6wQ%3D&reserved=0>.

It seems the workaround suggested uses the OptionsFactory to set some of the 
parameters from inside the job.



Since we provision the Flink cluster(version 1.7.1) for the teams, we control 
the RocksDB statebackend configuration from flink-conf.yaml.

And it seems there isn’t any related RocksDB 
configuration<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.7%2Fops%2Fconfig.html%23rocksdb-state-backend&data=01%7C01%7C%7C65ecdeaef08b4db3b70208d7e74e0991%7C6d4034cd72254f72b85391feaea64919%7C1&sdata=Pj1JIfN2AHc9jinYAEv6znjrcttwcEYIDnIdfQsf3cA%3D&reserved=0>
 to set in flink-conf.yaml.



Is there a way for the job developer to retrieve the default statebackend 
information from the cluster in the job and set the DBOptions on top of it?



Appreciate the help!



~ Abhinav Bajaj



PS:  Sharing below snippet as desired option if possible -



StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

StateBackend stateBackend = streamExecEnv.getDefaultStateBackend();

stateBackend.setOptions(new OptionsFactory() {

@Override
public DBOptions createDBOptions(DBOptions dbOptions) {
  dbOptions.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
  dbOptions.setMaxLogFileSize(1024 * 1024)
  return dbOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
columnFamilyOptions) {
  return columnFamilyOptions;
}

});






Reply via email to