This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d9bcb3b40ed5cefadbbaf391dacaa0ecd7fc8243 Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Tue Nov 21 10:06:10 2023 +0800 [FLINK-33669][doc] Update the usage of configuring checkpoint storage in docs. --- docs/content.zh/docs/deployment/filesystems/gcs.md | 11 +++++---- docs/content.zh/docs/ops/state/state_backends.md | 28 +++++++++++++--------- docs/content/docs/deployment/filesystems/azure.md | 5 +++- docs/content/docs/deployment/filesystems/gcs.md | 5 +++- docs/content/docs/deployment/filesystems/oss.md | 5 +++- docs/content/docs/deployment/filesystems/s3.md | 5 +++- .../datastream/fault-tolerance/checkpointing.md | 13 ++++++++-- docs/content/docs/ops/state/checkpoints.md | 12 +++++++--- docs/content/docs/ops/state/state_backends.md | 28 +++++++++++++--------- 9 files changed, 77 insertions(+), 35 deletions(-) diff --git a/docs/content.zh/docs/deployment/filesystems/gcs.md b/docs/content.zh/docs/deployment/filesystems/gcs.md index f80e5b3af4a..47cd137db33 100644 --- a/docs/content.zh/docs/deployment/filesystems/gcs.md +++ b/docs/content.zh/docs/deployment/filesystems/gcs.md @@ -37,14 +37,17 @@ gs://<your-bucket>/<endpoint> The endpoint can either be a single file or a directory, for example: ```java -// Read from GCS bucket +// 读取 GCS bucket env.readTextFile("gs://<bucket>/<endpoint>"); -// Write to GCS bucket +// 写入 GCS bucket stream.writeAsText("gs://<bucket>/<endpoint>"); -// Use GCS as checkpoint storage -env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>"); +// 将 GCS 用作 checkpoint storage +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "gs://<bucket>/<endpoint>"); +env.configure(config); ``` diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index bca2bc7af5b..eda37dada7e 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -516,8 +516,8 @@ env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage) ```python config = Configuration() config.set_string('state.backend.type', 'hashmap') +config.set_string('state.checkpoint-storage', 'jobmanager') env = StreamExecutionEnvironment.get_execution_environment(config) -env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) ``` {{< /tab >}} {{< /tabs>}} @@ -550,8 +550,9 @@ env.configure(config); // Advanced FsStateBackend configurations, such as write buffer size -// can be set by manually instantiating a FileSystemCheckpointStorage object. -env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir")); +// can be set manually by using CheckpointingOptions. +config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -570,13 +571,15 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" ```python config = Configuration() config.set_string('state.backend.type', 'hashmap') +config.set_string('state.checkpoint-storage', 'filesystem') +config.set_string('state.checkpoints.dir', 'file:///checkpoint-dir') env = StreamExecutionEnvironment.get_execution_environment(config) -env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") # Advanced FsStateBackend configurations, such as write buffer size -# can be set by manually instantiating a FileSystemCheckpointStorage object. -env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir")) +# can be set manually by using CheckpointingOptions. +config.set_string('state.storage.fs.write-buffer-size', '4096'); +env.configure(config); ``` {{< /tab >}} {{< /tabs>}} @@ -610,8 +613,9 @@ env.configure(config); // If you manually passed FsStateBackend into the RocksDBStateBackend constructor // to specify advanced checkpointing configurations such as write buffer size, -// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object. -env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir")); +// you can achieve the same results by using CheckpointingOptions. +config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -631,14 +635,16 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" ```python config = Configuration() config.set_string('state.backend.type', 'hashmap') +config.set_string('state.checkpoint-storage', 'filesystem') +config.set_string('state.checkpoints.dir', 'file:///checkpoint-dir') env = StreamExecutionEnvironment.get_execution_environment(config) -env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") # If you manually passed FsStateBackend into the RocksDBStateBackend constructor # to specify advanced checkpointing configurations such as write buffer size, -# you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object. -env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir")) +# you can achieve the same results by using CheckpointingOptions. +config.set_string('state.storage.fs.write-buffer-size', '4096'); +env.configure(config); ``` {{< /tab >}} {{< /tabs>}} diff --git a/docs/content/docs/deployment/filesystems/azure.md b/docs/content/docs/deployment/filesystems/azure.md index c9d49f52a41..0e761606e02 100644 --- a/docs/content/docs/deployment/filesystems/azure.md +++ b/docs/content/docs/deployment/filesystems/azure.md @@ -67,7 +67,10 @@ env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.window stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"); // Use Azure Blob Storage as checkpoint storage -env.getCheckpointConfig().setCheckpointStorage("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"); +env.configure(config); ``` ### Shaded Hadoop Azure Blob Storage file system diff --git a/docs/content/docs/deployment/filesystems/gcs.md b/docs/content/docs/deployment/filesystems/gcs.md index bd097b3a4c4..e6f8520b019 100644 --- a/docs/content/docs/deployment/filesystems/gcs.md +++ b/docs/content/docs/deployment/filesystems/gcs.md @@ -44,7 +44,10 @@ env.readTextFile("gs://<bucket>/<endpoint>"); stream.writeAsText("gs://<bucket>/<endpoint>"); // Use GCS as checkpoint storage -env.getCheckpointConfig().setCheckpointStorage("gs://<bucket>/<endpoint>"); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "gs://<bucket>/<endpoint>"); +env.configure(config); ``` diff --git a/docs/content/docs/deployment/filesystems/oss.md b/docs/content/docs/deployment/filesystems/oss.md index 0dc0ebabc09..9025f7e2170 100644 --- a/docs/content/docs/deployment/filesystems/oss.md +++ b/docs/content/docs/deployment/filesystems/oss.md @@ -50,7 +50,10 @@ env.readTextFile("oss://<your-bucket>/<object-name>"); stream.writeAsText("oss://<your-bucket>/<object-name>"); // Use OSS as checkpoint storage -env.getCheckpointConfig().setCheckpointStorage("oss://<your-bucket>/<object-name>"); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "oss://<your-bucket>/<object-name>"); +env.configure(config); ``` ### Shaded Hadoop OSS file system diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md index 7377557f268..3488258002d 100644 --- a/docs/content/docs/deployment/filesystems/s3.md +++ b/docs/content/docs/deployment/filesystems/s3.md @@ -45,7 +45,10 @@ env.readTextFile("s3://<bucket>/<endpoint>"); stream.writeAsText("s3://<bucket>/<endpoint>"); // Use S3 as checkpoint storage -env.getCheckpointConfig().setCheckpointStorage("s3://<your-bucket>/<endpoint>"); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://<your-bucket>/<endpoint>"); +env.configure(config); ``` Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup]({{< ref "docs/deployment/ha/overview" >}}) or the [EmbeddedRocksDBStateBackend]({{< ref "docs/ops/state/state_backends" >}}#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI (unless otherwise stated). diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index b7fba7feeaf..a17c21bee5f 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -123,7 +123,10 @@ env.getCheckpointConfig().setExternalizedCheckpointCleanup( env.getCheckpointConfig().enableUnalignedCheckpoints(); // sets the checkpoint storage where checkpoint snapshots will be written -env.getCheckpointConfig().setCheckpointStorage("hdfs:///my/checkpoint/dir"); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///my/checkpoint/dir"); +env.configure(config); // enable checkpointing with finished tasks Configuration config = new Configuration(); @@ -222,7 +225,13 @@ Where the checkpoints are stored (e.g., JobManager memory, file system, database By default, checkpoints are stored in memory in the JobManager. For proper persistence of large state, Flink supports various approaches for checkpointing state in other locations. -The choice of checkpoint storage can be configured via `StreamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(…)`. +The choice of checkpoint storage can be configured like the following code snippet. +```java +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "..."); +env.configure(config); +``` It is strongly encouraged that checkpoints be stored in a highly-available filesystem for production deployments. See [checkpoint storage]({{< ref "docs/ops/state/checkpoints#checkpoint-storage" >}}) for more details on the available options for job-wide and cluster-wide configuration. diff --git a/docs/content/docs/ops/state/checkpoints.md b/docs/content/docs/ops/state/checkpoints.md index 12e9416c5c2..f316845e115 100644 --- a/docs/content/docs/ops/state/checkpoints.md +++ b/docs/content/docs/ops/state/checkpoints.md @@ -144,7 +144,10 @@ state.checkpoints.dir: hdfs:///checkpoints/ #### Configure for per job on the checkpoint configuration ```java -env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/"); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/"); +env.configure(config); ``` #### Configure with checkpoint storage instance @@ -152,8 +155,11 @@ env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/"); Alternatively, checkpoint storage can be set by specifying the desired checkpoint storage instance which allows for setting low level configurations such as write buffer sizes. ```java -env.getCheckpointConfig().setCheckpointStorage( - new FileSystemCheckpointStorage("hdfs:///checkpoints-data/", FILE_SIZE_THESHOLD)); +Configuration config = new Configuration(); +config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); +config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///checkpoints-data/"); +config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, FILE_SIZE_THESHOLD); +env.configure(config); ``` ### Resuming from a retained checkpoint diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md index 5ccd28d8e55..b645eefcd8b 100644 --- a/docs/content/docs/ops/state/state_backends.md +++ b/docs/content/docs/ops/state/state_backends.md @@ -506,8 +506,8 @@ env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage) ```python config = Configuration() config.set_string('state.backend.type', 'hashmap') +config.set_string('state.checkpoint-storage', 'jobmanager') env = StreamExecutionEnvironment.get_execution_environment(config) -env.get_checkpoint_config().set_checkpoint_storage(JobManagerCheckpointStorage()) ``` {{< /tab >}} {{< /tabs>}} @@ -540,8 +540,9 @@ env.configure(config); // Advanced FsStateBackend configurations, such as write buffer size -// can be set by manually instantiating a FileSystemCheckpointStorage object. -env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir")); +// can be set manually by using CheckpointingOptions. +config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -560,13 +561,15 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" ```python config = Configuration() config.set_string('state.backend.type', 'hashmap') +config.set_string('state.checkpoint-storage', 'filesystem') +config.set_string('state.checkpoints.dir', 'file:///checkpoint-dir') env = StreamExecutionEnvironment.get_execution_environment(config) -env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") # Advanced FsStateBackend configurations, such as write buffer size -# can be set by manually instantiating a FileSystemCheckpointStorage object. -env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir")) +# can be set manually by using CheckpointingOptions. +config.set_string('state.storage.fs.write-buffer-size', '4096'); +env.configure(config); ``` {{< /tab >}} {{< /tabs>}} @@ -600,8 +603,9 @@ env.configure(config); // If you manually passed FsStateBackend into the RocksDBStateBackend constructor // to specify advanced checkpointing configurations such as write buffer size, -// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object. -env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir")); +// you can achieve the same results by using CheckpointingOptions. +config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 4 * 1024); +env.configure(config); ``` {{< /tab >}} {{< tab "Scala" >}} @@ -621,14 +625,16 @@ env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(" ```python config = Configuration() config.set_string('state.backend.type', 'rocksdb') +config.set_string('state.checkpoint-storage', 'filesystem') +config.set_string('state.checkpoints.dir', 'file:///checkpoint-dir') env = StreamExecutionEnvironment.get_execution_environment(config) -env.get_checkpoint_config().set_checkpoint_storage_dir("file:///checkpoint-dir") # If you manually passed FsStateBackend into the RocksDBStateBackend constructor # to specify advanced checkpointing configurations such as write buffer size, -# you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object. -env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("file:///checkpoint-dir")) +# you can achieve the same results by using CheckpointingOptions. +config.set_string('state.storage.fs.write-buffer-size', '4096'); +env.configure(config); ``` {{< /tab >}} {{< /tabs>}}