This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new e8cbfbfae feat(bindings/nodejs): add concurrent limit layer (#6739)
e8cbfbfae is described below
commit e8cbfbfae9a97bdbd6f0269fed33b6710effb014
Author: Kingsword <[email protected]>
AuthorDate: Mon Oct 27 17:02:30 2025 +0800
feat(bindings/nodejs): add concurrent limit layer (#6739)
* feat(bindings/nodejs): add ConcurrentLimitLayer
* add test
* format code
* ci
* fix test
* ci
---
.github/workflows/ci_bindings_nodejs.yml | 10 +-
bindings/nodejs/generated.d.ts | 61 ++++++-
bindings/nodejs/generated.js | 1 +
bindings/nodejs/index.cjs | 11 +-
bindings/nodejs/index.mjs | 3 +-
bindings/nodejs/src/layer.rs | 235 +++++++++++++++++++++++++++
bindings/nodejs/src/lib.rs | 133 +--------------
bindings/nodejs/tests/suites/index.mjs | 2 +
bindings/nodejs/tests/suites/layer.suite.mjs | 55 +++++++
9 files changed, 375 insertions(+), 136 deletions(-)
diff --git a/.github/workflows/ci_bindings_nodejs.yml
b/.github/workflows/ci_bindings_nodejs.yml
index 3fb3287e1..2bd6bfa1f 100644
--- a/.github/workflows/ci_bindings_nodejs.yml
+++ b/.github/workflows/ci_bindings_nodejs.yml
@@ -40,6 +40,10 @@ on:
- ".github/workflows/ci_bindings_nodejs.yml"
workflow_dispatch:
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
jobs:
test:
runs-on: ubuntu-latest
@@ -72,6 +76,9 @@ jobs:
- name: Check format
run: pnpm exec prettier --check .
+ - name: Check Clippy
+ run: cargo clippy -- -D warnings
+
- name: Build
run: pnpm build
@@ -80,6 +87,3 @@ jobs:
- name: Test
run: cargo test --no-fail-fast
-
- - name: Check Clippy
- run: cargo clippy -- -D warnings
diff --git a/bindings/nodejs/generated.d.ts b/bindings/nodejs/generated.d.ts
index ab99e28dc..c2219d529 100644
--- a/bindings/nodejs/generated.d.ts
+++ b/bindings/nodejs/generated.d.ts
@@ -205,6 +205,65 @@ export declare class Capability {
get shared(): boolean
}
+/**
+ * Concurrent limit layer
+ *
+ * Add concurrent request limit.
+ *
+ * # Notes
+ *
+ * Users can control how many concurrent connections could be established
+ * between OpenDAL and underlying storage services.
+ *
+ * All operators wrapped by this layer will share a common semaphore.
+ *
+ * # Examples
+ *
+ * ```javascript
+ * const op = new Operator("fs", { root: "/tmp" })
+ *
+ * // Create a concurrent limit layer with 1024 permits
+ * const limit = new ConcurrentLimitLayer(1024);
+ * op.layer(limit.build());
+ * ```
+ *
+ * With HTTP concurrent limit:
+ *
+ * ```javascript
+ * const limit = new ConcurrentLimitLayer(1024);
+ * limit.httpPermits = 512;
+ * op.layer(limit.build());
+ * ```
+ */
+export declare class ConcurrentLimitLayer {
+ /**
+ * Create a new ConcurrentLimitLayer with specified permits.
+ *
+ * This permits will be applied to all operations.
+ *
+ * # Arguments
+ *
+ * * `permits` - The maximum number of concurrent operations allowed.
+ */
+ constructor(permits: number)
+ /**
+ * Set a concurrent limit for HTTP requests.
+ *
+ * This will limit the number of concurrent HTTP requests made by the
operator.
+ *
+ * # Arguments
+ *
+ * * `v` - The maximum number of concurrent HTTP requests allowed.
+ */
+ set httpPermits(v: number)
+ /**
+ * Build the layer.
+ *
+ * Returns an `External<Layer>` that can be used with `Operator.layer()`.
+ */
+ build(): ExternalObject<Layer>
+}
+
/** Entry returned by Lister or BlockingLister to represent a path, and it's a
relative metadata. */
export declare class Entry {
/** Return the path of this entry. */
@@ -702,7 +761,7 @@ export declare class Reader {
* # Examples
*
* ```javascript
- * const op = new Operator("file", { root: "/tmp" })
+ * const op = new Operator("fs", { root: "/tmp" })
*
* const retry = new RetryLayer();
* retry.max_times = 3;
diff --git a/bindings/nodejs/generated.js b/bindings/nodejs/generated.js
index e019e715f..a937b9d68 100644
--- a/bindings/nodejs/generated.js
+++ b/bindings/nodejs/generated.js
@@ -531,6 +531,7 @@ module.exports.BlockingLister = nativeBinding.BlockingLister
module.exports.BlockingReader = nativeBinding.BlockingReader
module.exports.BlockingWriter = nativeBinding.BlockingWriter
module.exports.Capability = nativeBinding.Capability
+module.exports.ConcurrentLimitLayer = nativeBinding.ConcurrentLimitLayer
module.exports.Entry = nativeBinding.Entry
module.exports.Layer = nativeBinding.Layer
module.exports.Lister = nativeBinding.Lister
diff --git a/bindings/nodejs/index.cjs b/bindings/nodejs/index.cjs
index 9b17b4685..f976edc82 100644
--- a/bindings/nodejs/index.cjs
+++ b/bindings/nodejs/index.cjs
@@ -119,7 +119,15 @@ class BlockingWriteStream extends Writable {
}
}
-const { Operator, RetryLayer, BlockingReader, Reader, BlockingWriter, Writer }
= require('./generated.js')
+const {
+ Operator,
+ RetryLayer,
+ ConcurrentLimitLayer,
+ BlockingReader,
+ Reader,
+ BlockingWriter,
+ Writer,
+} = require('./generated.js')
BlockingReader.prototype.createReadStream = function (options) {
return new BlockingReadStream(this, options)
@@ -140,4 +148,5 @@ Writer.prototype.createWriteStream = function (options) {
module.exports.Operator = Operator
module.exports.layers = {
RetryLayer,
+ ConcurrentLimitLayer,
}
diff --git a/bindings/nodejs/index.mjs b/bindings/nodejs/index.mjs
index ccafda569..1eac4ec77 100644
--- a/bindings/nodejs/index.mjs
+++ b/bindings/nodejs/index.mjs
@@ -120,7 +120,7 @@ class BlockingWriteStream extends Writable {
}
import * as generated from './generated.js'
-const { Operator, RetryLayer, BlockingReader, Reader, BlockingWriter, Writer }
= generated
+const { Operator, RetryLayer, ConcurrentLimitLayer, BlockingReader, Reader,
BlockingWriter, Writer } = generated
BlockingReader.prototype.createReadStream = function (options) {
return new BlockingReadStream(this, options)
@@ -140,6 +140,7 @@ Writer.prototype.createWriteStream = function (options) {
export const layers = {
RetryLayer,
+ ConcurrentLimitLayer,
}
export * from './generated.js'
diff --git a/bindings/nodejs/src/layer.rs b/bindings/nodejs/src/layer.rs
new file mode 100644
index 000000000..80b2c65d7
--- /dev/null
+++ b/bindings/nodejs/src/layer.rs
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::time::Duration;
+
+use napi::bindgen_prelude::External;
+
+pub trait NodeLayer: Send + Sync {
+ fn layer(&self, op: opendal::Operator) -> opendal::Operator;
+}
+
+/// A public layer wrapper
+#[napi]
+pub struct Layer {
+ pub(crate) inner: Box<dyn NodeLayer>,
+}
+
+impl NodeLayer for opendal::layers::RetryLayer {
+ fn layer(&self, op: opendal::Operator) -> opendal::Operator {
+ op.layer(self.clone())
+ }
+}
+
+/// Retry layer
+///
+/// Add retry for temporary failed operations.
+///
+/// # Notes
+///
+/// This layer will retry failed operations when [`Error::is_temporary`]
+/// returns true.
+/// If the operation still failed, this layer will set error to
+/// `Persistent` which means error has been retried.
+///
+/// `write` and `blocking_write` don't support retry so far,
+/// visit [this issue](https://github.com/apache/opendal/issues/1223) for more
details.
+///
+/// # Examples
+///
+/// ```javascript
+/// const op = new Operator("fs", { root: "/tmp" })
+///
+/// const retry = new RetryLayer();
+/// retry.max_times = 3;
+/// retry.jitter = true;
+///
+/// op.layer(retry.build());
+/// ```
+#[derive(Default)]
+#[napi]
+pub struct RetryLayer {
+ jitter: bool,
+ max_times: Option<u32>,
+ factor: Option<f64>,
+ max_delay: Option<f64>,
+ min_delay: Option<f64>,
+}
+
+#[napi]
+impl RetryLayer {
+ #[napi(constructor)]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Set jitter of current backoff.
+ ///
+ /// If jitter is enabled, ExponentialBackoff will add a random jitter in
`[0, min_delay)`
+ /// to current delay.
+ #[napi(setter)]
+ pub fn jitter(&mut self, v: bool) {
+ self.jitter = v;
+ }
+
+ /// Set max_times of current backoff.
+ ///
+ /// Backoff will return `None` if max times are reached.
+ #[napi(setter)]
+ pub fn max_times(&mut self, v: u32) {
+ self.max_times = Some(v);
+ }
+
+ /// Set factor of current backoff.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if the input factor is smaller than `1.0`.
+ #[napi(setter)]
+ pub fn factor(&mut self, v: f64) {
+ self.factor = Some(v);
+ }
+
+ /// Set max_delay of current backoff.
+ ///
+ /// Delay will not increase if the current delay is larger than max_delay.
+ ///
+ /// # Notes
+ ///
+ /// - The unit of max_delay is millisecond.
+ #[napi(setter)]
+ pub fn max_delay(&mut self, v: f64) {
+ self.max_delay = Some(v);
+ }
+
+ /// Set min_delay of current backoff.
+ ///
+ /// # Notes
+ ///
+ /// - The unit of min_delay is millisecond.
+ #[napi(setter)]
+ pub fn min_delay(&mut self, v: f64) {
+ self.min_delay = Some(v);
+ }
+
+ #[napi]
+ pub fn build(&self) -> External<Layer> {
+ let mut l = opendal::layers::RetryLayer::default();
+ if self.jitter {
+ l = l.with_jitter();
+ }
+ if let Some(max_times) = self.max_times {
+ l = l.with_max_times(max_times as usize);
+ }
+ if let Some(factor) = self.factor {
+ l = l.with_factor(factor as f32);
+ }
+ if let Some(max_delay) = self.max_delay {
+ l = l.with_max_delay(Duration::from_millis(max_delay as u64));
+ }
+ if let Some(min_delay) = self.min_delay {
+ l = l.with_min_delay(Duration::from_millis(min_delay as u64));
+ }
+
+ External::new(Layer { inner: Box::new(l) })
+ }
+}
+
+impl NodeLayer for opendal::layers::ConcurrentLimitLayer {
+ fn layer(&self, op: opendal::Operator) -> opendal::Operator {
+ op.layer(self.clone())
+ }
+}
+
+/// Concurrent limit layer
+///
+/// Add concurrent request limit.
+///
+/// # Notes
+///
+/// Users can control how many concurrent connections could be established
+/// between OpenDAL and underlying storage services.
+///
+/// All operators wrapped by this layer will share a common semaphore.
+///
+/// # Examples
+///
+/// ```javascript
+/// const op = new Operator("fs", { root: "/tmp" })
+///
+/// // Create a concurrent limit layer with 1024 permits
+/// const limit = new ConcurrentLimitLayer(1024);
+/// op.layer(limit.build());
+/// ```
+///
+/// With HTTP concurrent limit:
+///
+/// ```javascript
+/// const limit = new ConcurrentLimitLayer(1024);
+/// limit.httpPermits = 512;
+/// op.layer(limit.build());
+/// ```
+#[napi]
+pub struct ConcurrentLimitLayer {
+ permits: i64,
+ http_permits: Option<i64>,
+}
+
+#[napi]
+impl ConcurrentLimitLayer {
+ /// Create a new ConcurrentLimitLayer with specified permits.
+ ///
+ /// This permits will be applied to all operations.
+ ///
+ /// # Arguments
+ ///
+ /// * `permits` - The maximum number of concurrent operations allowed.
+ #[napi(constructor)]
+ pub fn new(permits: i64) -> Self {
+ Self {
+ permits,
+ http_permits: None,
+ }
+ }
+
+ /// Set a concurrent limit for HTTP requests.
+ ///
+ /// This will limit the number of concurrent HTTP requests made by the
operator.
+ ///
+ /// # Arguments
+ ///
+ /// * `v` - The maximum number of concurrent HTTP requests allowed.
+ #[napi(setter)]
+ pub fn http_permits(&mut self, v: i64) {
+ self.http_permits = Some(v);
+ }
+
+ /// Build the layer.
+ ///
+ /// Returns an `External<Layer>` that can be used with `Operator.layer()`.
+ #[napi]
+ pub fn build(&self) -> External<Layer> {
+ let permits = self.permits;
+ let mut l = opendal::layers::ConcurrentLimitLayer::new(permits as
usize);
+
+ if let Some(http_permits) = self.http_permits {
+ l = l.with_http_concurrent_limit(http_permits as usize);
+ }
+
+ External::new(Layer { inner: Box::new(l) })
+ }
+}
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index 736bc1e9e..932539ea6 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -31,7 +31,10 @@ use opendal::options::{
DeleteOptions, ListOptions, ReadOptions, ReaderOptions, StatOptions,
WriteOptions,
};
+use crate::layer::Layer;
+
mod capability;
+mod layer;
mod options;
#[napi]
@@ -1052,16 +1055,6 @@ impl PresignedRequest {
}
}
-pub trait NodeLayer: Send + Sync {
- fn layer(&self, op: opendal::Operator) -> opendal::Operator;
-}
-
-/// A public layer wrapper
-#[napi]
-pub struct Layer {
- inner: Box<dyn NodeLayer>,
-}
-
#[napi]
impl Operator {
/// Add a layer to this operator.
@@ -1079,126 +1072,6 @@ impl Operator {
}
}
-impl NodeLayer for opendal::layers::RetryLayer {
- fn layer(&self, op: opendal::Operator) -> opendal::Operator {
- op.layer(self.clone())
- }
-}
-
-/// Retry layer
-///
-/// Add retry for temporary failed operations.
-///
-/// # Notes
-///
-/// This layer will retry failed operations when [`Error::is_temporary`]
-/// returns true.
-/// If the operation still failed, this layer will set error to
-/// `Persistent` which means error has been retried.
-///
-/// `write` and `blocking_write` don't support retry so far,
-/// visit [this issue](https://github.com/apache/opendal/issues/1223) for more
details.
-///
-/// # Examples
-///
-/// ```javascript
-/// const op = new Operator("file", { root: "/tmp" })
-///
-/// const retry = new RetryLayer();
-/// retry.max_times = 3;
-/// retry.jitter = true;
-///
-/// op.layer(retry.build());
-/// ```
-#[derive(Default)]
-#[napi]
-pub struct RetryLayer {
- jitter: bool,
- max_times: Option<u32>,
- factor: Option<f64>,
- max_delay: Option<f64>,
- min_delay: Option<f64>,
-}
-
-#[napi]
-impl RetryLayer {
- #[napi(constructor)]
- pub fn new() -> Self {
- Self::default()
- }
-
- /// Set jitter of current backoff.
- ///
- /// If jitter is enabled, ExponentialBackoff will add a random jitter in
`[0, min_delay)`
- /// to current delay.
- #[napi(setter)]
- pub fn jitter(&mut self, v: bool) {
- self.jitter = v;
- }
-
- /// Set max_times of current backoff.
- ///
- /// Backoff will return `None` if max times are reached.
- #[napi(setter)]
- pub fn max_times(&mut self, v: u32) {
- self.max_times = Some(v);
- }
-
- /// Set factor of current backoff.
- ///
- /// # Panics
- ///
- /// This function will panic if the input factor is smaller than `1.0`.
- #[napi(setter)]
- pub fn factor(&mut self, v: f64) {
- self.factor = Some(v);
- }
-
- /// Set max_delay of current backoff.
- ///
- /// Delay will not increase if the current delay is larger than max_delay.
- ///
- /// # Notes
- ///
- /// - The unit of max_delay is millisecond.
- #[napi(setter)]
- pub fn max_delay(&mut self, v: f64) {
- self.max_delay = Some(v);
- }
-
- /// Set min_delay of current backoff.
- ///
- /// # Notes
- ///
- /// - The unit of min_delay is millisecond.
- #[napi(setter)]
- pub fn min_delay(&mut self, v: f64) {
- self.min_delay = Some(v);
- }
-
- #[napi]
- pub fn build(&self) -> External<Layer> {
- let mut l = opendal::layers::RetryLayer::default();
- if self.jitter {
- l = l.with_jitter();
- }
- if let Some(max_times) = self.max_times {
- l = l.with_max_times(max_times as usize);
- }
- if let Some(factor) = self.factor {
- l = l.with_factor(factor as f32);
- }
- if let Some(max_delay) = self.max_delay {
- l = l.with_max_delay(Duration::from_millis(max_delay as u64));
- }
- if let Some(min_delay) = self.min_delay {
- l = l.with_min_delay(Duration::from_millis(min_delay as u64));
- }
-
- External::new(Layer { inner: Box::new(l) })
- }
-}
-
/// Format opendal error to napi error.
///
/// FIXME: handle error correctly.
diff --git a/bindings/nodejs/tests/suites/index.mjs
b/bindings/nodejs/tests/suites/index.mjs
index f7848c2dd..791b15c48 100644
--- a/bindings/nodejs/tests/suites/index.mjs
+++ b/bindings/nodejs/tests/suites/index.mjs
@@ -24,6 +24,7 @@ import { checkRandomRootEnabled, generateRandomRoot,
loadConfigFromEnv } from '.
import { run as AsyncIOTestRun } from './async.suite.mjs'
import { run as ServicesTestRun } from './services.suite.mjs'
import { run as SyncIOTestRun } from './sync.suite.mjs'
+import { run as LayerTestRun } from './layer.suite.mjs'
import { run as AsyncStatOptionsTestRun } from './asyncStatOptions.suite.mjs'
import { run as SyncStatOptionsTestRun } from './syncStatOptions.suite.mjs'
import { run as AsyncReadOptionsTestRun } from './asyncReadOptions.suite.mjs'
@@ -63,6 +64,7 @@ export function runner(testName, scheme) {
AsyncIOTestRun(operator)
ServicesTestRun(operator)
SyncIOTestRun(operator)
+ LayerTestRun(operator)
AsyncStatOptionsTestRun(operator)
SyncStatOptionsTestRun(operator)
AsyncReadOptionsTestRun(operator)
diff --git a/bindings/nodejs/tests/suites/layer.suite.mjs
b/bindings/nodejs/tests/suites/layer.suite.mjs
new file mode 100644
index 000000000..dc9f0307c
--- /dev/null
+++ b/bindings/nodejs/tests/suites/layer.suite.mjs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { test, assert } from 'vitest'
+
+import { RetryLayer, ConcurrentLimitLayer } from '../../index.mjs'
+
+/**
+ * @param {import("../../index").Operator} op
+ */
+export function run(op) {
+ test('test operator with retry layer', () => {
+ const retryLayer = new RetryLayer()
+ retryLayer.maxTimes = 3
+ retryLayer.jitter = true
+
+ const layerOp = op.layer(retryLayer.build())
+ assert.ok(layerOp)
+ assert.ok(layerOp.capability())
+ })
+
+ test('test operator with concurrent limit layer', () => {
+ const concurrentLimitLayer = new ConcurrentLimitLayer(1024)
+ const layerOp = op.layer(concurrentLimitLayer.build())
+
+ assert.ok(layerOp)
+ assert.ok(layerOp.capability())
+ })
+
+ test('test operator with concurrent limit layer and http permits', () => {
+ const concurrentLimitLayer = new ConcurrentLimitLayer(1024)
+ concurrentLimitLayer.httpPermits = 512
+
+ const layerOp = op.layer(concurrentLimitLayer.build())
+
+ assert.ok(layerOp)
+ assert.ok(layerOp.capability())
+ })
+}