This is an automated email from the ASF dual-hosted git repository.
pingtimeout pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris-tools.git
The following commit(s) were added to refs/heads/main by this push:
new 5590740 feat: introduce SetupActions class (#90)
5590740 is described below
commit 5590740cfa1591d6e3a3f7363b897dfcd73568c2
Author: Artur Rakhmatulin <[email protected]>
AuthorDate: Wed Dec 10 16:41:13 2025 +0000
feat: introduce SetupActions class (#90)
- code deduplication
- introduce SetupActions class
- introduce AuthParameters
- enhance auth-flow configuration
- decouple auth-parameters from ConnectionParameters
- moving governaning accessToken logic into SetupActions
---
benchmarks/README.md | 11 +-
.../src/gatling/resources/benchmark-defaults.conf | 9 ++
.../benchmarks/actions/AuthenticationActions.scala | 30 ++---
.../polaris/benchmarks/actions/SetupActions.scala | 126 +++++++++++++++++++++
...ectionParameters.scala => AuthParameters.scala} | 22 ++--
.../benchmarks/parameters/BenchmarkConfig.scala | 11 +-
.../parameters/ConnectionParameters.scala | 6 +-
.../benchmarks/simulations/CreateCommits.scala | 56 ++-------
.../benchmarks/simulations/CreateTreeDataset.scala | 58 +++-------
.../benchmarks/simulations/ReadTreeDataset.scala | 61 +++-------
.../simulations/ReadUpdateTreeDataset.scala | 51 ++-------
.../WeightedWorkloadOnTreeDataset.scala | 40 ++-----
12 files changed, 244 insertions(+), 237 deletions(-)
diff --git a/benchmarks/README.md b/benchmarks/README.md
index 954f6f5..a3dc868 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -57,16 +57,25 @@ dataset.tree {
### Connection Parameters
-Connection settings are configured under `http` and `auth`:
+Connection settings are configured under `http`:
```hocon
http {
base-url = "http://localhost:8181" # Service URL
}
+```
+
+### Authentication Parameters
+Authentication settings are configured under `auth`:
+
+```hocon
auth {
client-id = null # Required: OAuth2 client ID
client-secret = null # Required: OAuth2 client secret
+ max-retries = 10 # Maximum number of retry attempts for authentication
failures
+ retryable-http-codes = [500] # HTTP status codes that should trigger a retry
+ refresh-interval-seconds = 60 # Refresh interval for the authentication
token in seconds
}
```
diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf
b/benchmarks/src/gatling/resources/benchmark-defaults.conf
index 7106536..9caeec0 100644
--- a/benchmarks/src/gatling/resources/benchmark-defaults.conf
+++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf
@@ -33,6 +33,15 @@ auth {
# OAuth2 client secret for authentication
# Required: Must be provided in configuration
client-secret = null
+
+ # Refresh interval for the authentication token in seconds
+ refresh-interval-seconds = 60
+
+ # Maximum number of retry attempts for authentication failures
+ max-retries = 10
+
+ # HTTP status codes that should trigger a retry
+ retryable-http-codes = [500]
}
# Dataset tree structure configuration
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
index 8c8b04e..8228ecc 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
@@ -26,7 +26,7 @@ import org.apache.polaris.benchmarks.RetryOnHttpCodes.{
retryOnHttpStatus,
HttpRequestBuilderWithStatusSave
}
-import org.apache.polaris.benchmarks.parameters.ConnectionParameters
+import org.apache.polaris.benchmarks.parameters.{AuthParameters,
ConnectionParameters}
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicReference
@@ -35,30 +35,28 @@ import java.util.concurrent.atomic.AtomicReference
* Actions for performance testing authentication operations. This class
provides methods to
* authenticate and manage access tokens for API requests.
*
- * @param cp Connection parameters containing client credentials
+ * @param cp Connection parameters containing the base URL
+ * @param ap Authentication parameters
* @param accessToken Reference to the authentication token shared across
actions
- * @param maxRetries Maximum number of retry attempts for failed operations
- * @param retryableHttpCodes HTTP status codes that should trigger a retry
*/
case class AuthenticationActions(
cp: ConnectionParameters,
- accessToken: AtomicReference[String],
- maxRetries: Int = 10,
- retryableHttpCodes: Set[Int] = Set(500)
+ ap: AuthParameters,
+ accessToken: AtomicReference[String]
) {
private val logger = LoggerFactory.getLogger(getClass)
/**
* Creates a Gatling Feeder that provides authentication credentials. The
feeder continuously
- * supplies client ID and client secret from the connection parameters for
use in authentication
- * requests.
+ * supplies client ID and client secret from the authentication parameters
for use in
+ * authentication requests.
*
* @return An iterator providing client credentials
*/
def feeder(): Feeder[String] = Iterator.continually(
Map(
- "clientId" -> cp.clientId,
- "clientSecret" -> cp.clientSecret
+ "clientId" -> ap.clientId,
+ "clientSecret" -> ap.clientSecret
)
)
@@ -71,7 +69,7 @@ case class AuthenticationActions(
* There is no limit to the maximum number of users that can authenticate
concurrently.
*/
val authenticateAndSaveAccessToken: ChainBuilder =
- retryOnHttpStatus(maxRetries, retryableHttpCodes, "Authenticate")(
+ retryOnHttpStatus(ap.maxRetries, ap.retryableHttpCodes, "Authenticate")(
http("Authenticate")
.post("/api/catalog/v1/oauth/tokens")
.header("Content-Type", "application/x-www-form-urlencoded")
@@ -89,12 +87,4 @@ case class AuthenticationActions(
}
session
}
-
- /**
- * Restores the current access token from the shared reference into the
Gatling session. This
- * operation is useful when a scenario needs to reuse an authentication
token from a previous
- * scenario.
- */
- val restoreAccessTokenInSession: ChainBuilder =
- exec(session => session.set("accessToken", accessToken.get()))
}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/SetupActions.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/SetupActions.scala
new file mode 100644
index 0000000..458ced4
--- /dev/null
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/SetupActions.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.benchmarks.actions
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.{ChainBuilder, ScenarioBuilder}
+import org.apache.polaris.benchmarks.parameters.{AuthParameters,
ConnectionParameters}
+
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import scala.concurrent.duration._
+
+/**
+ * Actions for setting up necessary shared states like authentication across
benchmark simulations.
+ *
+ * @param cp Connection parameters containing the base URL
+ * @param ap Authentication parameters containing client credentials and retry
settings
+ */
+case class SetupActions(
+ cp: ConnectionParameters,
+ ap: AuthParameters
+) {
+
+ /**
+ * Shared access token reference that can be passed to all action classes.
+ */
+ val accessToken: AtomicReference[String] = new AtomicReference()
+
+ /**
+ * Internal flag to control the token refresh loop.
+ */
+ private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
+
+ /**
+ * Authentication actions instance that handles the actual OAuth token
operations.
+ */
+ private val authActions: AuthenticationActions =
+ AuthenticationActions(cp, ap, accessToken)
+
+ /**
+ * Continuously refreshes the OAuth token at the configured interval
specified in
+ * [[AuthParameters.refreshIntervalSeconds]]. This scenario runs
indefinitely in a loop controlled
+ * by the shouldRefreshToken flag until [[stopRefreshingToken]] is called.
+ *
+ * @return ScenarioBuilder that continuously refreshes the token
+ */
+ def continuouslyRefreshOauthToken(): ScenarioBuilder = {
+ val interval = ap.refreshIntervalSeconds.seconds
+ scenario(s"Authenticate every ${interval.toSeconds}s using the Iceberg
REST API")
+ .asLongAs(_ => shouldRefreshToken.get()) {
+ feed(authActions.feeder())
+ .exec(authActions.authenticateAndSaveAccessToken)
+ .pause(interval)
+ }
+ }
+
+ /**
+ * Refreshes the OAuth token at the configured interval specified in
+ * [[AuthParameters.refreshIntervalSeconds]] for a specified duration. Unlike
+ * [[continuouslyRefreshOauthToken]], this method automatically stops after
the duration expires
+ * without requiring an explicit stop call.
+ *
+ * @param duration Total duration to keep refreshing the token
+ * @return ScenarioBuilder that refreshes the token for the specified
duration
+ */
+ def refreshOauthForDuration(duration: FiniteDuration): ScenarioBuilder = {
+ val interval = ap.refreshIntervalSeconds.seconds
+ scenario(s"Authenticate every ${interval.toSeconds}s using the Iceberg
REST API")
+ .during(duration) {
+ feed(authActions.feeder())
+ .exec(authActions.authenticateAndSaveAccessToken)
+ .pause(interval)
+ }
+ }
+
+ /**
+ * Waits for the authentication token to be available before proceeding.
This is useful when the
+ * authentication is performed by a separate scenario. This operation does
not make any network
+ * requests.
+ *
+ * @return ScenarioBuilder that waits for token availability
+ */
+ val waitForAuthentication: ScenarioBuilder =
+ scenario("Wait for the authentication token to be available")
+ .asLongAs(_ => accessToken.get() == null) {
+ pause(1.second)
+ }
+
+ /**
+ * Stops the token refresh loop. This is useful when the authentication is
performed by a separate
+ * scenario. This operation does not make any network requests.
+ *
+ * @return ScenarioBuilder that stops the token refresh
+ */
+ val stopRefreshingToken: ScenarioBuilder =
+ scenario("Stop refreshing the authentication token")
+ .exec { session =>
+ shouldRefreshToken.set(false)
+ session
+ }
+
+ /**
+ * Restores the access token from the shared reference to the Gatling
session. This is useful when
+ * the authentication is performed by a separate scenario. This operation
does not make any
+ * network requests.
+ *
+ * @return ChainBuilder that restores the token to the session
+ */
+ val restoreAccessTokenInSession: ChainBuilder =
+ exec(session => session.set("accessToken", accessToken.get()))
+}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala
similarity index 62%
copy from
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
copy to
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala
index f227ca8..f6f8733 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala
@@ -19,18 +19,24 @@
package org.apache.polaris.benchmarks.parameters
/**
- * Case class to hold the connection parameters for the benchmark.
+ * Case class to hold the authentication parameters for the benchmark.
*
* @param clientId The client ID for authentication.
* @param clientSecret The client secret for authentication.
- * @param baseUrl The base URL of the Polaris service.
+ * @param refreshIntervalSeconds Refresh interval for the authentication token
in seconds.
+ * @param maxRetries Maximum number of retry attempts for authentication
failures.
+ * @param retryableHttpCodes HTTP status codes that should trigger a retry.
*/
-case class ConnectionParameters(clientId: String, clientSecret: String,
baseUrl: String) {
+case class AuthParameters(
+ clientId: String,
+ clientSecret: String,
+ refreshIntervalSeconds: Int,
+ maxRetries: Int,
+ retryableHttpCodes: Set[Int]
+) {
require(clientId != null && clientId.nonEmpty, "Client ID cannot be null or
empty")
require(clientSecret != null && clientSecret.nonEmpty, "Client secret cannot
be null or empty")
- require(baseUrl != null && baseUrl.nonEmpty, "Base URL cannot be null or
empty")
- require(
- baseUrl.startsWith("http://") || baseUrl.startsWith("https://"),
- "Base URL must start with http:// or https://"
- )
+ require(refreshIntervalSeconds > 0, "Refresh interval must be positive")
+ require(maxRetries >= 0, "Max retries cannot be negative")
+ require(retryableHttpCodes != null, "Retryable HTTP codes cannot be null")
}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
index 1674127..2865768 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
@@ -32,9 +32,15 @@ object BenchmarkConfig {
val workload: Config = config.getConfig("workload")
val connectionParams = ConnectionParameters(
+ http.getString("base-url")
+ )
+
+ val authParams = AuthParameters(
auth.getString("client-id"),
auth.getString("client-secret"),
- http.getString("base-url")
+ auth.getInt("refresh-interval-seconds"),
+ auth.getInt("max-retries"),
+
auth.getIntList("retryable-http-codes").toArray.map(_.asInstanceOf[Int]).toSet
)
val workloadParams = {
@@ -89,12 +95,13 @@ object BenchmarkConfig {
dataset.getString("storage-config-info")
)
- BenchmarkConfig(connectionParams, workloadParams, datasetParams)
+ BenchmarkConfig(connectionParams, authParams, workloadParams,
datasetParams)
}
}
case class BenchmarkConfig(
connectionParameters: ConnectionParameters,
+ authParameters: AuthParameters,
workloadParameters: WorkloadParameters,
datasetParameters: DatasetParameters
) {}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
index f227ca8..b047fb3 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
@@ -21,13 +21,9 @@ package org.apache.polaris.benchmarks.parameters
/**
* Case class to hold the connection parameters for the benchmark.
*
- * @param clientId The client ID for authentication.
- * @param clientSecret The client secret for authentication.
* @param baseUrl The base URL of the Polaris service.
*/
-case class ConnectionParameters(clientId: String, clientSecret: String,
baseUrl: String) {
- require(clientId != null && clientId.nonEmpty, "Client ID cannot be null or
empty")
- require(clientSecret != null && clientSecret.nonEmpty, "Client secret cannot
be null or empty")
+case class ConnectionParameters(baseUrl: String) {
require(baseUrl != null && baseUrl.nonEmpty, "Base URL cannot be null or
empty")
require(
baseUrl.startsWith("http://") || baseUrl.startsWith("https://"),
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
index 6f1749c..0c5254e 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
@@ -21,14 +21,10 @@ package org.apache.polaris.benchmarks.simulations
import io.gatling.core.Predef._
import io.gatling.core.structure.ScenarioBuilder
import io.gatling.http.Predef.http
-import org.apache.polaris.benchmarks.actions.{
- AuthenticationActions,
- NamespaceActions,
- TableActions,
- ViewActions
-}
+import org.apache.polaris.benchmarks.actions.{SetupActions, TableActions,
ViewActions}
import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
import org.apache.polaris.benchmarks.parameters.{
+ AuthParameters,
ConnectionParameters,
DatasetParameters,
WorkloadParameters
@@ -36,7 +32,6 @@ import org.apache.polaris.benchmarks.parameters.{
import org.apache.polaris.benchmarks.util.CircularIterator
import org.slf4j.LoggerFactory
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.concurrent.duration._
class CreateCommits extends Simulation {
@@ -46,45 +41,16 @@ class CreateCommits extends Simulation {
// Load parameters
//
--------------------------------------------------------------------------------
val cp: ConnectionParameters = config.connectionParameters
+ val ap: AuthParameters = config.authParameters
val dp: DatasetParameters = config.datasetParameters
val wp: WorkloadParameters = config.workloadParameters
//
--------------------------------------------------------------------------------
// Helper values
//
--------------------------------------------------------------------------------
- private val accessToken: AtomicReference[String] = new AtomicReference()
- private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
- private val authActions = AuthenticationActions(cp, accessToken)
- private val tableActions = TableActions(dp, wp, accessToken)
- private val viewActions = ViewActions(dp, wp, accessToken)
-
- //
--------------------------------------------------------------------------------
- // Authentication related workloads:
- // * Authenticate and store the access token for later use every minute
- // * Wait for an OAuth token to be available
- // * Stop the token refresh loop
- //
--------------------------------------------------------------------------------
- val continuouslyRefreshOauthToken: ScenarioBuilder =
- scenario("Authenticate every minute using the Iceberg REST API")
- .asLongAs(_ => shouldRefreshToken.get())(
- feed(authActions.feeder())
- .exec(authActions.authenticateAndSaveAccessToken)
- .pause(1.minute)
- )
-
- val waitForAuthentication: ScenarioBuilder =
- scenario("Wait for the authentication token to be available")
- .asLongAs(_ => accessToken.get() == null)(
- pause(1.second)
- )
-
- val stopRefreshingToken: ScenarioBuilder =
- scenario("Stop refreshing the authentication token")
- .exec { session =>
- shouldRefreshToken.set(false)
- session
- }
+ private val setupActions = SetupActions(cp, ap)
+ private val tableActions = TableActions(dp, wp, setupActions.accessToken)
+ private val viewActions = ViewActions(dp, wp, setupActions.accessToken)
//
--------------------------------------------------------------------------------
// Read and write workloads:
@@ -93,7 +59,7 @@ class CreateCommits extends Simulation {
//
--------------------------------------------------------------------------------
val tableUpdateScenario: ScenarioBuilder =
scenario("Create table commits by updating properties")
- .exec(authActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.feed(tableActions.propertyUpdateFeeder())
.exec(tableActions.updateTable)
@@ -104,7 +70,7 @@ class CreateCommits extends Simulation {
//
--------------------------------------------------------------------------------
val viewUpdateScenario: ScenarioBuilder =
scenario("Create view commits by updating properties")
- .exec(authActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.feed(viewActions.propertyUpdateFeeder())
.exec(viewActions.updateView)
@@ -117,8 +83,8 @@ class CreateCommits extends Simulation {
private val viewCommitsThroughput = wp.createCommits.viewCommitsThroughput
private val durationInMinutes = wp.createCommits.durationInMinutes
setUp(
-
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
- waitForAuthentication
+
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+ setupActions.waitForAuthentication
.inject(atOnceUsers(1))
.andThen(
tableUpdateScenario
@@ -132,6 +98,6 @@ class CreateCommits extends Simulation {
)
.protocols(httpProtocol)
)
-
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
)
}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
index ae04756..bec73a7 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
@@ -24,13 +24,14 @@ import io.gatling.http.Predef._
import org.apache.polaris.benchmarks.actions._
import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
import org.apache.polaris.benchmarks.parameters.{
+ AuthParameters,
ConnectionParameters,
DatasetParameters,
WorkloadParameters
}
import org.slf4j.LoggerFactory
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger,
AtomicReference}
+import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
/**
@@ -44,6 +45,7 @@ class CreateTreeDataset extends Simulation {
// Load parameters
//
--------------------------------------------------------------------------------
val cp: ConnectionParameters = config.connectionParameters
+ val ap: AuthParameters = config.authParameters
val dp: DatasetParameters = config.datasetParameters
val wp: WorkloadParameters = config.workloadParameters
@@ -51,53 +53,23 @@ class CreateTreeDataset extends Simulation {
// Helper values
//
--------------------------------------------------------------------------------
private val numNamespaces: Int = dp.nAryTree.numberOfNodes
- private val accessToken: AtomicReference[String] = new AtomicReference()
- private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
- private val authenticationActions = AuthenticationActions(cp, accessToken,
5, Set(500))
- private val catalogActions = CatalogActions(dp, accessToken, 0, Set())
- private val namespaceActions = NamespaceActions(dp, wp, accessToken, 5,
Set(500))
- private val tableActions = TableActions(dp, wp, accessToken, 0, Set())
- private val viewActions = ViewActions(dp, wp, accessToken, 0, Set())
+ private val setupActions = SetupActions(cp, ap)
+ private val catalogActions = CatalogActions(dp, setupActions.accessToken, 0,
Set())
+ private val namespaceActions = NamespaceActions(dp, wp,
setupActions.accessToken, 5, Set(500))
+ private val tableActions = TableActions(dp, wp, setupActions.accessToken, 0,
Set())
+ private val viewActions = ViewActions(dp, wp, setupActions.accessToken, 0,
Set())
private val createdCatalogs = new AtomicInteger()
private val createdNamespaces = new AtomicInteger()
private val createdTables = new AtomicInteger()
private val createdViews = new AtomicInteger()
- //
--------------------------------------------------------------------------------
- // Authentication related workloads:
- // * Authenticate and store the access token for later use every minute
- // * Wait for an OAuth token to be available
- // * Stop the token refresh loop
- //
--------------------------------------------------------------------------------
- val continuouslyRefreshOauthToken: ScenarioBuilder =
- scenario("Authenticate every minute using the Iceberg REST API")
- .asLongAs(_ => shouldRefreshToken.get()) {
- feed(authenticationActions.feeder())
- .exec(authenticationActions.authenticateAndSaveAccessToken)
- .pause(1.minute)
- }
-
- val waitForAuthentication: ScenarioBuilder =
- scenario("Wait for the authentication token to be available")
- .asLongAs(_ => accessToken.get() == null) {
- pause(1.second)
- }
-
- val stopRefreshingToken: ScenarioBuilder =
- scenario("Stop refreshing the authentication token")
- .exec { session =>
- shouldRefreshToken.set(false)
- session
- }
-
//
--------------------------------------------------------------------------------
// Workload: Create catalogs
//
--------------------------------------------------------------------------------
val createCatalogs: ScenarioBuilder =
scenario("Create catalogs using the Polaris Management REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
createdCatalogs.getAndIncrement() < dp.numCatalogs &&
session.contains("accessToken")
)(
@@ -109,7 +81,7 @@ class CreateTreeDataset extends Simulation {
// Workload: Create namespaces
//
--------------------------------------------------------------------------------
val createNamespaces: ScenarioBuilder = scenario("Create namespaces using
the Iceberg REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
createdNamespaces.getAndIncrement() < numNamespaces &&
session.contains("accessToken")
)(
@@ -121,7 +93,7 @@ class CreateTreeDataset extends Simulation {
// Workload: Create tables
//
--------------------------------------------------------------------------------
val createTables: ScenarioBuilder = scenario("Create tables using the
Iceberg REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
createdTables.getAndIncrement() < dp.numTables &&
session.contains("accessToken")
)(
@@ -133,7 +105,7 @@ class CreateTreeDataset extends Simulation {
// Workload: Create views
//
--------------------------------------------------------------------------------
val createViews: ScenarioBuilder = scenario("Create views using the Iceberg
REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
createdViews.getAndIncrement() < dp.numViews &&
session.contains("accessToken")
)(
@@ -155,8 +127,8 @@ class CreateTreeDataset extends Simulation {
private val viewThroughput = wp.createTreeDataset.viewThroughput
setUp(
-
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
- waitForAuthentication
+
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+ setupActions.waitForAuthentication
.inject(atOnceUsers(1))
.andThen(createCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol))
.andThen(
@@ -169,6 +141,6 @@ class CreateTreeDataset extends Simulation {
)
.andThen(createTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol))
.andThen(createViews.inject(atOnceUsers(viewThroughput)).protocols(httpProtocol))
-
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
)
}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
index 44a0077..2b8d9e0 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
@@ -23,11 +23,11 @@ import io.gatling.core.structure.ScenarioBuilder
import io.gatling.http.Predef._
import org.apache.polaris.benchmarks.actions._
import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
-import org.apache.polaris.benchmarks.parameters.WorkloadParameters
+import org.apache.polaris.benchmarks.parameters.{AuthParameters,
WorkloadParameters}
import org.slf4j.LoggerFactory
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger,
AtomicReference}
-import scala.concurrent.duration.DurationInt
+import java.util.concurrent.atomic.AtomicInteger
+import scala.concurrent.duration._
/**
* This simulation is a 100% read workload that fetches a tree dataset in
Polaris. It is intended to
@@ -42,6 +42,7 @@ class ReadTreeDataset extends Simulation {
// Load parameters
//
--------------------------------------------------------------------------------
private val cp = config.connectionParameters
+ private val ap: AuthParameters = config.authParameters
private val dp = config.datasetParameters
val wp: WorkloadParameters = config.workloadParameters
@@ -49,52 +50,22 @@ class ReadTreeDataset extends Simulation {
// Helper values
//
--------------------------------------------------------------------------------
private val numNamespaces: Int = dp.nAryTree.numberOfNodes
- private val accessToken: AtomicReference[String] = new AtomicReference()
- private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
- private val authenticationActions = AuthenticationActions(cp, accessToken)
- private val catalogActions = CatalogActions(dp, accessToken)
- private val namespaceActions = NamespaceActions(dp, wp, accessToken)
- private val tableActions = TableActions(dp, wp, accessToken)
- private val viewActions = ViewActions(dp, wp, accessToken)
+ private val setupActions = SetupActions(cp, ap)
+ private val catalogActions = CatalogActions(dp, setupActions.accessToken)
+ private val namespaceActions = NamespaceActions(dp, wp,
setupActions.accessToken)
+ private val tableActions = TableActions(dp, wp, setupActions.accessToken)
+ private val viewActions = ViewActions(dp, wp, setupActions.accessToken)
private val verifiedCatalogs = new AtomicInteger()
private val verifiedNamespaces = new AtomicInteger()
private val verifiedTables = new AtomicInteger()
private val verifiedViews = new AtomicInteger()
- //
--------------------------------------------------------------------------------
- // Authentication related workloads:
- // * Authenticate and store the access token for later use every minute
- // * Wait for an OAuth token to be available
- // * Stop the token refresh loop
- //
--------------------------------------------------------------------------------
- val continuouslyRefreshOauthToken: ScenarioBuilder =
- scenario("Authenticate every minute using the Iceberg REST API")
- .asLongAs(_ => shouldRefreshToken.get()) {
- feed(authenticationActions.feeder())
- .exec(authenticationActions.authenticateAndSaveAccessToken)
- .pause(1.minute)
- }
-
- val waitForAuthentication: ScenarioBuilder =
- scenario("Wait for the authentication token to be available")
- .asLongAs(_ => accessToken.get() == null) {
- pause(1.second)
- }
-
- val stopRefreshingToken: ScenarioBuilder =
- scenario("Stop refreshing the authentication token")
- .exec { session =>
- shouldRefreshToken.set(false)
- session
- }
-
//
--------------------------------------------------------------------------------
// Workload: Verify each catalog
//
--------------------------------------------------------------------------------
private val verifyCatalogs = scenario("Verify catalogs using the Polaris
Management REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
verifiedCatalogs.getAndIncrement() < dp.numCatalogs &&
session.contains("accessToken")
)(
@@ -106,7 +77,7 @@ class ReadTreeDataset extends Simulation {
// Workload: Verify namespaces
//
--------------------------------------------------------------------------------
private val verifyNamespaces = scenario("Verify namespaces using the Iceberg
REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
verifiedNamespaces.getAndIncrement() < numNamespaces &&
session.contains("accessToken")
)(
@@ -120,7 +91,7 @@ class ReadTreeDataset extends Simulation {
// Workload: Verify tables
//
--------------------------------------------------------------------------------
private val verifyTables = scenario("Verify tables using the Iceberg REST
API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
verifiedTables.getAndIncrement() < dp.numTables &&
session.contains("accessToken")
)(
@@ -134,7 +105,7 @@ class ReadTreeDataset extends Simulation {
// Workload: Verify views
//
--------------------------------------------------------------------------------
private val verifyViews = scenario("Verify views using the Iceberg REST API")
- .exec(authenticationActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.asLongAs(session =>
verifiedViews.getAndIncrement() < dp.numViews &&
session.contains("accessToken")
)(
@@ -158,13 +129,13 @@ class ReadTreeDataset extends Simulation {
private val viewThroughput = wp.readTreeDataset.viewThroughput
setUp(
-
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
- waitForAuthentication
+
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+ setupActions.waitForAuthentication
.inject(atOnceUsers(1))
.andThen(verifyCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol))
.andThen(verifyNamespaces.inject(atOnceUsers(dp.nsDepth)).protocols(httpProtocol))
.andThen(verifyTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol))
.andThen(verifyViews.inject(atOnceUsers(viewThroughput)).protocols(httpProtocol))
-
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
)
}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
index 1827c0c..abf744f 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
@@ -24,6 +24,7 @@ import io.gatling.http.Predef._
import org.apache.polaris.benchmarks.actions._
import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
import org.apache.polaris.benchmarks.parameters.{
+ AuthParameters,
ConnectionParameters,
DatasetParameters,
WorkloadParameters
@@ -31,7 +32,6 @@ import org.apache.polaris.benchmarks.parameters.{
import org.apache.polaris.benchmarks.util.CircularIterator
import org.slf4j.LoggerFactory
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.concurrent.duration._
/**
@@ -47,20 +47,18 @@ class ReadUpdateTreeDataset extends Simulation {
// Load parameters
//
--------------------------------------------------------------------------------
val cp: ConnectionParameters = config.connectionParameters
+ val ap: AuthParameters = config.authParameters
val dp: DatasetParameters = config.datasetParameters
val wp: WorkloadParameters = config.workloadParameters
//
--------------------------------------------------------------------------------
// Helper values
//
--------------------------------------------------------------------------------
- private val accessToken: AtomicReference[String] = new AtomicReference()
- private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
- private val authActions = AuthenticationActions(cp, accessToken)
- private val catActions = CatalogActions(dp, accessToken)
- private val nsActions = NamespaceActions(dp, wp, accessToken)
- private val tblActions = TableActions(dp, wp, accessToken)
- private val viewActions = ViewActions(dp, wp, accessToken)
+ private val setupActions = SetupActions(cp, ap)
+ private val catActions = CatalogActions(dp, setupActions.accessToken)
+ private val nsActions = NamespaceActions(dp, wp, setupActions.accessToken)
+ private val tblActions = TableActions(dp, wp, setupActions.accessToken)
+ private val viewActions = ViewActions(dp, wp, setupActions.accessToken)
private val nsListFeeder = new
CircularIterator(nsActions.namespaceIdentityFeeder)
private val nsExistsFeeder = new
CircularIterator(nsActions.namespaceIdentityFeeder)
@@ -77,39 +75,12 @@ class ReadUpdateTreeDataset extends Simulation {
private val viewFetchFeeder = new
CircularIterator(viewActions.viewFetchFeeder)
private val viewUpdateFeeder = viewActions.propertyUpdateFeeder()
- //
--------------------------------------------------------------------------------
- // Authentication related workloads:
- // * Authenticate and store the access token for later use every minute
- // * Wait for an OAuth token to be available
- // * Stop the token refresh loop
- //
--------------------------------------------------------------------------------
- val continuouslyRefreshOauthToken: ScenarioBuilder =
- scenario("Authenticate every minute using the Iceberg REST API")
- .asLongAs(_ => shouldRefreshToken.get()) {
- feed(authActions.feeder())
- .exec(authActions.authenticateAndSaveAccessToken)
- .pause(1.minute)
- }
-
- val waitForAuthentication: ScenarioBuilder =
- scenario("Wait for the authentication token to be available")
- .asLongAs(_ => accessToken.get() == null) {
- pause(1.second)
- }
-
- val stopRefreshingToken: ScenarioBuilder =
- scenario("Stop refreshing the authentication token")
- .exec { session =>
- shouldRefreshToken.set(false)
- session
- }
-
//
--------------------------------------------------------------------------------
// Workload: Randomly read and write entities
//
--------------------------------------------------------------------------------
val readWriteScenario: ScenarioBuilder =
scenario("Read and write entities using the Iceberg REST API")
- .exec(authActions.restoreAccessTokenInSession)
+ .exec(setupActions.restoreAccessTokenInSession)
.randomSwitch(
wp.readUpdateTreeDataset.gatlingReadRatio -> group("Read")(
uniformRandomSwitch(
@@ -147,8 +118,8 @@ class ReadUpdateTreeDataset extends Simulation {
private val durationInMinutes = wp.readUpdateTreeDataset.durationInMinutes
setUp(
-
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
- waitForAuthentication
+
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+ setupActions.waitForAuthentication
.inject(atOnceUsers(1))
.andThen(
readWriteScenario
@@ -157,6 +128,6 @@ class ReadUpdateTreeDataset extends Simulation {
)
.protocols(httpProtocol)
)
-
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
)
}
diff --git
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
index f6e8dc1..bc6376e 100644
---
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
+++
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
@@ -24,6 +24,7 @@ import io.gatling.http.Predef._
import org.apache.polaris.benchmarks.actions._
import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
import org.apache.polaris.benchmarks.parameters.{
+ AuthParameters,
ConnectionParameters,
DatasetParameters,
Distribution,
@@ -32,7 +33,6 @@ import org.apache.polaris.benchmarks.parameters.{
}
import org.slf4j.LoggerFactory
-import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration._
/**
@@ -47,6 +47,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
// Load parameters
//
--------------------------------------------------------------------------------
val cp: ConnectionParameters = config.connectionParameters
+ val ap: AuthParameters = config.authParameters
val dp: DatasetParameters = config.datasetParameters
val wp: WorkloadParameters = config.workloadParameters
@@ -62,30 +63,13 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
println("### Writer distributions ###")
wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
- //
--------------------------------------------------------------------------------
- // Helper values
- //
--------------------------------------------------------------------------------
- private val accessToken: AtomicReference[String] = new AtomicReference()
-
- private val authActions = AuthenticationActions(cp, accessToken)
- private val tblActions = TableActions(dp, wp, accessToken)
+ private val duration =
wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes
//
--------------------------------------------------------------------------------
- // Authentication related workloads
+ // Helper values
//
--------------------------------------------------------------------------------
- val refreshOauthForDuration: ScenarioBuilder =
- scenario("Authenticate every 30s using the Iceberg REST API")
- .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
- feed(authActions.feeder())
- .exec(authActions.authenticateAndSaveAccessToken)
- .pause(30.seconds)
- }
-
- val waitForAuthentication: ScenarioBuilder =
- scenario("Wait for the authentication token to be available")
- .asLongAs(_ => accessToken.get() == null) {
- pause(1.second)
- }
+ private val setupActions = SetupActions(cp, ap)
+ private val tblActions = TableActions(dp, wp, setupActions.accessToken)
//
--------------------------------------------------------------------------------
// Build up the HTTP protocol configuration and set up the simulation
@@ -105,8 +89,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
val rnp =
RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, ((i + 1)
* 1000) + threadId)
scenario(s"Reader-$i-$threadId")
- .exec(authActions.restoreAccessTokenInSession)
- .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ .exec(setupActions.restoreAccessTokenInSession)
+ .during(duration) {
exec { session =>
val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
val (catalog, namespace, table) =
@@ -136,8 +120,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
val rnp =
RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, ((i + 1)
* 2000) + threadId)
scenario(s"Writer-$i-$threadId")
- .exec(authActions.restoreAccessTokenInSession)
- .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+ .exec(setupActions.restoreAccessTokenInSession)
+ .during(duration) {
exec { session =>
val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
val (catalog, namespace, table) =
@@ -161,8 +145,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
// Setup
//
--------------------------------------------------------------------------------
setUp(
- refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol),
- waitForAuthentication
+
setupActions.refreshOauthForDuration(duration).inject(atOnceUsers(1)).protocols(httpProtocol),
+ setupActions.waitForAuthentication
.inject(atOnceUsers(1))
.protocols(httpProtocol)
.andThen(