This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cassandra-easy-stress.git
The following commit(s) were added to refs/heads/main by this push:
new 8407586 Added a new run flag --parquet which will take all client
latency events and write to a Apache Parquet file (#44)
8407586 is described below
commit 8407586c90a0b1063416c8e47ce1ba1ad160c369
Author: dcapwell <[email protected]>
AuthorDate: Fri May 23 16:38:08 2025 -0700
Added a new run flag --parquet which will take all client latency events
and write to a Apache Parquet file (#44)
---
build.gradle | 7 ++
manual/MANUAL.adoc | 87 ++++++++++++++++++-
manual/examples/cassandra-easy-stress-help.txt | 5 ++
.../com/rustyrazorblade/easycassstress/Either.kt | 7 ++
.../com/rustyrazorblade/easycassstress/Metrics.kt | 6 --
.../easycassstress/OperationCallback.kt | 35 +++-----
.../easycassstress/ProfileRunner.kt | 8 +-
.../rustyrazorblade/easycassstress/RequestQueue.kt | 22 +----
.../easycassstress/StressContext.kt | 44 +++++++++-
.../easycassstress/collector/AsyncCollector.kt | 86 +++++++++++++++++++
.../easycassstress/collector/Collector.kt | 25 ++++++
.../easycassstress/collector/CompositeCollector.kt | 32 +++++++
.../easycassstress/collector/HdrCollector.kt | 63 ++++++++++++++
.../easycassstress/collector/ParquetCollector.kt | 98 ++++++++++++++++++++++
.../rustyrazorblade/easycassstress/commands/Run.kt | 88 +++++++++++--------
.../easycassstress/workloads/IStressProfile.kt | 32 +++----
.../collector/ParquetCollectorTest.kt | 65 ++++++++++++++
17 files changed, 609 insertions(+), 101 deletions(-)
diff --git a/build.gradle b/build.gradle
index 5b1a7b3..801d6ac 100644
--- a/build.gradle
+++ b/build.gradle
@@ -101,6 +101,13 @@ dependencies {
implementation 'org.apache.commons:commons-math3:3.6.1'
implementation 'org.hdrhistogram:HdrHistogram:2.1.12'
+ implementation("org.agrona:agrona:1.22.0") // can't use the 2.x or 1.23+
line as it requires JDK 17
+
+ // for Parquet support
+ implementation("org.apache.parquet:parquet-hadoop:1.15.2")
+ implementation 'org.apache.hadoop:hadoop-common:3.4.1'
+ implementation 'org.apache.hadoop:hadoop-mapreduce-client-common:3.4.1'
+
// exporting dropwizard metrics
testImplementation group: 'org.junit.jupiter', name:
'junit-jupiter-engine', version: '5.1.0'
diff --git a/manual/MANUAL.adoc b/manual/MANUAL.adoc
index d2cf85b..7aae32c 100644
--- a/manual/MANUAL.adoc
+++ b/manual/MANUAL.adoc
@@ -53,7 +53,7 @@ $ sudo apt install cassandra-easy-stress
==== RPM Packages
-eYou'll need the bintray repo set up on your machine. Create this
`/etc/yum.repos.d/tlp-tools.repo`:
+You'll need the bintray repo set up on your machine. Create this
`/etc/yum.repos.d/tlp-tools.repo`:
```
[bintraybintray-rustyrazorblade-tlp-tools-rpm]
@@ -315,6 +315,91 @@ The Debian package installs a basic configuration file to
`/etc/cassandra-easy-s
cassandra-easy-stress automatically runs an HTTP server exporting metrics in
Prometheus format on port 9501.
+=== Capturing Client Latencies to Apache Parquet
+
+You can capture detailed client latency metrics to an Apache Parquet file
using the `--parquet` flag followed by a path to a file or directory:
+
+```
+$ bin/cassandra-easy-stress run KeyValue --duration 5m --parquet rawlog.parquet
+```
+
+This writes operation metrics including operation type, success/failure
status, start time, and duration to a Parquet file that can be analyzed later
with data analysis tools like Pandas, Spark, DuckDB, or visualization tools
that support the Parquet format.
+
+If a directory is provided instead of a file, cassandra-easy-stress will
automatically create an appropriately named file in that directory.
+
+==== Analyzing Parquet Files with DuckDB
+
+The Parquet files created by cassandra-easy-stress can be easily analyzed
using DuckDB, a lightweight analytical database engine. Here are some example
queries to get you started:
+
+```sql
+-- Show summary statistics for operation latencies for every minute
+SELECT date_trunc('minute', epoch_ms(request_start_time_ms)) as minute,
+ COUNT(*) as count,
+ AVG(request_duration_ns / 1000 / 1000) as avg,
+ MIN(request_duration_ns / 1000 / 1000) as min,
+ MAX(request_duration_ns / 1000 / 1000) as max,
+ APPROX_QUANTILE(request_duration_ns / 1000 / 1000, .5) as p50,
+ APPROX_QUANTILE(request_duration_ns / 1000 / 1000, .9) as p90,
+ APPROX_QUANTILE(request_duration_ns / 1000 / 1000, .99) as p99,
+ COUNT(CASE WHEN failure_reason != '' THEN 1 END) AS errors,
+ COUNT(CASE WHEN failure_reason = 'ReadTimeoutException' THEN 1 END) AS
read_timeouts,
+ COUNT(CASE WHEN failure_reason = 'WriteTimeoutException' THEN 1 END) AS
write_timeouts,
+FROM read_parquet('rawlog.parquet')
+GROUP BY minute
+ORDER BY minute;
+
+┌─────────────────────┬────────┬─────────────────────┬──────────┬────────────────────┬─────────────────────┬─────────────────────┬────────────────────┬────────┬───────────────┬────────────────┐
+│ minute │ count │ avg │ min │ max
│ p50 │ p90 │ p99 │ errors
│ read_timeouts │ write_timeouts │
+│ timestamp │ int64 │ double │ double │ double
│ double │ double │ double │ int64
│ int64 │ int64 │
+├─────────────────────┼────────┼─────────────────────┼──────────┼────────────────────┼─────────────────────┼─────────────────────┼────────────────────┼────────┼───────────────┼────────────────┤
+│ 2025-05-23 22:45:00 │ 141911 │ 18.891404855317813 │ 0.088042 │
1305.993875 │ 0.23617621307864622 │ 0.5454138286498701 │ 755.160273634975 │
0 │ 0 │ 0 │
+│ 2025-05-23 22:46:00 │ 300081 │ 0.26154326542833034 │ 0.091458 │
16.620042 │ 0.2198726495794396 │ 0.28866495065114356 │ 1.0759477146627234 │
0 │ 0 │ 0 │
+│ 2025-05-23 22:47:00 │ 300075 │ 0.29655502679997087 │ 0.089208 │
19.247875 │ 0.2241928371244093 │ 0.3096208807374364 │ 1.8582042492087465 │
0 │ 0 │ 0 │
+│ 2025-05-23 22:48:00 │ 298543 │ 0.6524298801211285 │ 0.093666 │
198.99904199999997 │ 0.22677466153454573 │ 0.33573102120265713 │
9.839418314581492 │ 0 │ 0 │ 0 │
+│ 2025-05-23 22:49:00 │ 300053 │ 0.30696147925533085 │ 0.100167 │
64.216666 │ 0.2249157848195121 │ 0.3072763658664282 │ 1.6342296967730887 │
0 │ 0 │ 0 │
+│ 2025-05-23 22:50:00 │ 24765 │ 0.4530204902079576 │ 0.12675 │
39.548167 │ 0.2259263537252715 │ 0.30608390597020046 │ 7.7139616210838575 │
0 │ 0 │ 0 │
+└─────────────────────┴────────┴─────────────────────┴──────────┴────────────────────┴─────────────────────┴─────────────────────┴────────────────────┴────────┴───────────────┴────────────────┘
+
+
+-- Show error counts
+SELECT failure_reason, count(*)
+FROM read_parquet('rawlog.parquet')
+WHERE failure_reason != ''
+GROUP BY failure_reason;
+```
+
+You can also use DuckDB through its various clients including Python, R, Java,
and JDBC.
+
+==== Understanding Request Time vs Service Time
+
+When analyzing latency data from the Parquet files, it's important to
understand the distinction between two key metrics:
+
+* **Service Time**: This is the actual time it takes for the database to
process a request and return a response once the request is received by the
database. It measures only the execution time of the operation.
+
+* **Request Time**: This is the total time from when the client intended to
make the request until receiving the response. It includes the service time
plus any queue time or delays that might have occurred before the request was
actually sent to the database.
+
+The difference between these metrics is critical for understanding coordinated
omission, a common problem in performance testing where the test client doesn't
accurately capture the full latency that would be experienced by real users
when the system is under load.
+
+For example, if your database is overloaded and can only process 100
operations per second, but your test is trying to send 200 operations per
second:
+
+* A naïve benchmark would only measure the service time of the operations that
actually got processed, missing the fact that half the operations were delayed.
+* A properly instrumented benchmark (like cassandra-easy-stress) captures the
request time, which includes how long operations had to wait in a queue.
+
+When using the Parquet files for analysis, you can examine both metrics to get
a more complete picture of your system's performance under load:
+
+```sql
+-- Compare average service time vs request time by operation type
+SELECT
+ operation,
+ AVG(service_duration_ns / 1000 / 1000) as avg_service_time_ms,
+ AVG(request_duration_ns / 1000 / 1000) as avg_request_time_ms,
+ AVG(request_duration_ns - service_duration_ns) / 1000 / 1000 as
avg_queue_time_ms
+FROM read_parquet('rawlog.parquet')
+GROUP BY operation;
+```
+
+A significant difference between average request time and average service time
indicates queuing or scheduling delays in your system, which can be an early
warning sign of performance bottlenecks.
+
=== Workload Restrictions
The `BasicTimeSeries` workload only supports Cassandra versions 3.0 and above.
This is because range deletes are used by this workload during runtime. Range
deletes are only support in Cassandra versions 3.0. An exception will is thrown
if this workload is used and a Cassandra version less than 3.0 is detected
during runtime.
diff --git a/manual/examples/cassandra-easy-stress-help.txt
b/manual/examples/cassandra-easy-stress-help.txt
index 1b71f3a..a32b22a 100644
--- a/manual/examples/cassandra-easy-stress-help.txt
+++ b/manual/examples/cassandra-easy-stress-help.txt
@@ -97,6 +97,11 @@ Usage: cassandra-easy-stress [options] [command] [command
options]
Default: false
--paging
Override the driver's default page size.
+ --parquet
+ Capture client latency metrics to a Apache Parquet file at the
+ specified path. If the file is a directory, the file will be
+ named rawlog.parquet within that directory
+ Default: <empty string>
--partitiongenerator, --pg
Method of generating partition keys. Supports random, normal
(gaussian), and sequence.
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/Either.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/Either.kt
new file mode 100644
index 0000000..75135c3
--- /dev/null
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/Either.kt
@@ -0,0 +1,7 @@
+package com.rustyrazorblade.easycassstress
+
+sealed class Either<out A, out B> {
+ class Left<A>(val value: A) : Either<A, Nothing>()
+
+ class Right<B>(val value: B) : Either<Nothing, B>()
+}
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
index 722868c..b5245f3 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/Metrics.kt
@@ -22,7 +22,6 @@ import com.codahale.metrics.ScheduledReporter
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.dropwizard.DropwizardExports
import io.prometheus.client.exporter.HTTPServer
-import org.HdrHistogram.SynchronizedHistogram
import java.util.Optional
import java.util.concurrent.TimeUnit
@@ -71,11 +70,6 @@ class Metrics(val metricRegistry: MetricRegistry, val
reporters: List<ScheduledR
val deletionThroughputTracker = getTracker { deletions.count }.start()
val populateThroughputTracker = getTracker { populate.count }.start()
- // Using a synchronized histogram for now, we may need to change this
later if it's a perf bottleneck
- val mutationHistogram = SynchronizedHistogram(2)
- val selectHistogram = SynchronizedHistogram(2)
- val deleteHistogram = SynchronizedHistogram(2)
-
/**
* We track throughput using separate structures than Dropwizard
*/
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
index 3dd8ad9..f4969c9 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/OperationCallback.kt
@@ -18,9 +18,11 @@
package com.rustyrazorblade.easycassstress
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.google.common.base.Throwables
import com.rustyrazorblade.easycassstress.workloads.IStressRunner
import com.rustyrazorblade.easycassstress.workloads.Operation
import org.apache.logging.log4j.kotlin.logger
+import java.util.concurrent.TimeUnit
import java.util.function.BiConsumer
/**
@@ -32,8 +34,9 @@ class OperationCallback(
val context: StressContext,
val runner: IStressRunner,
val op: Operation,
+ val startNanos: Long,
+ val populatePhase: Boolean,
val paginate: Boolean = false,
- val writeHdr: Boolean = true,
) : BiConsumer<AsyncResultSet?, Throwable?> {
companion object {
val log = logger()
@@ -45,6 +48,7 @@ class OperationCallback(
) {
if (t != null) {
context.metrics.errors.mark()
+ context.collect(op, Either.Right(Throwables.getRootCause(t)),
startNanos, System.nanoTime())
log.error { t }
return
}
@@ -56,39 +60,26 @@ class OperationCallback(
result.fetchNextPage()
}
}
+ val endNanos = System.nanoTime()
+ context.timer(op, populatePhase).update(endNanos - op.createdAtNanos,
TimeUnit.NANOSECONDS)
+ // TODO (visibility): include details about paging?
+ context.collect(op, Either.Left(result!!), startNanos, endNanos)
- val time = op.startTime.stop()
-
- // we log to the HDR histogram and do the callback for mutations
+ // do the callback for mutations
// might extend this to select, but I can't see a reason for it now
when (op) {
is Operation.Mutation -> {
- if (writeHdr) {
- context.metrics.mutationHistogram.recordValue(time)
- }
runner.onSuccess(op, result)
}
-
- is Operation.Deletion -> {
- if (writeHdr) {
- context.metrics.deleteHistogram.recordValue(time)
- }
- }
-
- is Operation.SelectStatement -> {
- if (writeHdr) {
- context.metrics.selectHistogram.recordValue(time)
- }
- }
is Operation.DDL -> {
- if (writeHdr) {
- context.metrics.mutationHistogram.recordValue(time)
- }
runner.onSuccess(op, result)
}
is Operation.Stop -> {
throw OperationStopException()
}
+ else -> {
+ // ignore
+ }
}
}
}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
index c94d6e6..fd0478b 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/ProfileRunner.kt
@@ -142,6 +142,7 @@ class ProfileRunner(
var paginate = context.mainArguments.paginate
for (op in queue.getNextOperation()) {
// In driver v4, async execution returns a CompletionStage
+ val startNanos = System.nanoTime()
val future =
when (op) {
is Operation.DDL -> {
@@ -161,8 +162,9 @@ class ProfileRunner(
context,
runner,
op,
+ startNanos,
+ queue.populatePhase,
paginate = paginate,
- writeHdr = context.mainArguments.hdrHistogramPrefix != "",
)
future.whenComplete { result, error ->
@@ -200,6 +202,7 @@ class ProfileRunner(
try {
for (op in queue.getNextOperation()) {
+ val startNanos = System.nanoTime()
val future = context.session.executeAsync(op.bound!!)
// Create callback to handle the result
@@ -208,8 +211,9 @@ class ProfileRunner(
context,
runner,
op,
+ startNanos,
+ queue.populatePhase,
paginate = false,
- writeHdr = context.mainArguments.hdrHistogramPrefix !=
"",
)
future.whenComplete { result, error ->
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
index b6d65ad..f57e834 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/RequestQueue.kt
@@ -17,7 +17,6 @@
*/
package com.rustyrazorblade.easycassstress
-import com.codahale.metrics.Timer
import com.rustyrazorblade.easycassstress.workloads.IStressRunner
import com.rustyrazorblade.easycassstress.workloads.Operation
import org.apache.logging.log4j.kotlin.logger
@@ -38,7 +37,7 @@ class RequestQueue(
runner: IStressRunner,
readRate: Double,
deleteRate: Double,
- populatePhase: Boolean = false,
+ val populatePhase: Boolean = false,
) {
val queue =
ArrayBlockingQueue<Operation>(context.mainArguments.queueDepth.toInt(), true)
var generatorThread: Thread
@@ -57,23 +56,6 @@ class RequestQueue(
var executed = 0L
log.info("populate=$populatePhase total values: $totalValues,
duration: $duration")
- // we're using a separate timer for populate phase
- // regardless of the operation performed
- fun getTimer(operation: Operation): Timer {
- return if (populatePhase) {
- context.metrics.populate
- } else {
- when (operation) {
- is Operation.SelectStatement ->
context.metrics.selects
- is Operation.Mutation -> context.metrics.mutations
- is Operation.Deletion -> context.metrics.deletions
- is Operation.Stop -> throw OperationStopException()
- // maybe this should be under DDL, it's a weird
case.
- is Operation.DDL -> context.metrics.mutations
- }
- }
- }
-
for (key in partitionKeyGenerator.generateKey(totalValues,
context.mainArguments.partitionValues)) {
if (duration > 0 &&
desiredEndTime.isBefore(LocalDateTime.now())) {
log.info("Reached duration, ending")
@@ -113,8 +95,6 @@ class RequestQueue(
runner.getNextMutation(key)
}
- op.startTime = getTimer(op).time()
-
if (!queue.offer(op)) {
context.metrics.errors.mark()
}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
index 8a3198d..bc6d95b 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/StressContext.kt
@@ -17,10 +17,14 @@
*/
package com.rustyrazorblade.easycassstress
+import com.codahale.metrics.Timer
import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.google.common.util.concurrent.RateLimiter
+import com.rustyrazorblade.easycassstress.collector.Collector
import com.rustyrazorblade.easycassstress.commands.Run
import com.rustyrazorblade.easycassstress.generators.Registry
+import com.rustyrazorblade.easycassstress.workloads.Operation
data class StressContext(
val session: CqlSession,
@@ -29,4 +33,42 @@ data class StressContext(
val metrics: Metrics,
val registry: Registry,
val rateLimiter: RateLimiter?,
-)
+ val collector: Collector,
+) {
+ fun collect(
+ op: Operation,
+ result: Either<AsyncResultSet, Throwable>,
+ startNanos: Long,
+ endNanos: Long,
+ ) = collector.collect(this, op, result, startNanos, endNanos)
+
+ // we're using a separate timer for populate phase
+ // regardless of the operation performed
+ fun timer(
+ op: Operation,
+ populatePhase: Boolean,
+ ): Timer =
+ if (populatePhase) {
+ metrics.populate
+ } else {
+ when (op) {
+ is Operation.SelectStatement -> metrics.selects
+ is Operation.Mutation -> metrics.mutations
+ is Operation.Deletion -> metrics.deletions
+ is Operation.Stop -> throw OperationStopException()
+ // maybe this should be under DDL, it's a weird case.
+ is Operation.DDL -> metrics.mutations
+ }
+ }
+}
+
+data class Context(
+ val session: CqlSession,
+ val mainArguments: Run,
+ val metrics: Metrics,
+ val registry: Registry,
+ val rateLimiter: RateLimiter?,
+ val collector: Collector,
+) {
+ fun stress(thread: Int): StressContext = StressContext(session,
mainArguments, thread, metrics, registry, rateLimiter, collector)
+}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/AsyncCollector.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/AsyncCollector.kt
new file mode 100644
index 0000000..2c2e861
--- /dev/null
+++
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/AsyncCollector.kt
@@ -0,0 +1,86 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue
+import org.agrona.concurrent.BackoffIdleStrategy
+import java.io.Closeable
+import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * Base type for collectors that have "expensive" work that needs to happen
off-thread. Every call to collect will
+ * generate an object, push to a queue, and processed in another worker thread.
+ *
+ * Implementations must define the writer interface which will be called to do
the "real" work for the collector.
+ */
+abstract class AsyncCollector(
+ fileOrDirectory: File,
+) : Collector {
+ data class Event(
+ val op: Operation,
+ val result: Either<AsyncResultSet, Throwable>,
+ val startNanos: Long,
+ val endNanos: Long,
+ )
+
+ interface Writer : Closeable {
+ fun write(event: Event)
+ }
+
+ private val queue =
MpscArrayQueue<Event>(Integer.getInteger("cassandra-easy-stress.event_csv_queue_size",
4096))
+ private val writer = createWriter(fileOrDirectory)
+
+ @Volatile
+ private var running = true
+ private val thread = Thread(this::run)
+ private val idleStrategy = BackoffIdleStrategy()
+
+ init {
+ thread.isDaemon = true
+ thread.name = "cassandra-easy-stress event raw log collector"
+ thread.start()
+ }
+
+ val dropped = AtomicInteger()
+ val counter = AtomicInteger()
+
+ abstract fun createWriter(fileOrDirectory: File): Writer
+
+ override fun collect(
+ ctx: StressContext,
+ op: Operation,
+ result: Either<AsyncResultSet, Throwable>,
+ startNanos: Long,
+ endNanos: Long,
+ ) {
+ if (!queue.offer(Event(op, result, startNanos, endNanos))) {
+ dropped.incrementAndGet()
+ }
+ }
+
+ private fun run() {
+ while (running) {
+ try {
+ val processed = queue.drain(writer::write)
+ counter.addAndGet(processed)
+ idleStrategy.idle(processed)
+ } catch (t: Throwable) {
+ System.err.println("Exception while writing raw logs")
+ t.printStackTrace()
+ running = false
+ return
+ }
+ }
+ }
+
+ override fun close(context: Context) {
+ running = false
+ thread.join()
+ writer.close()
+ println("Wrote ${counter.get()} events; Dropped ${dropped.get()}
events")
+ }
+}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/Collector.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/Collector.kt
new file mode 100644
index 0000000..8005305
--- /dev/null
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/Collector.kt
@@ -0,0 +1,25 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+
+/**
+ * When an operation completes (success or failure) this interface is called
showing the state at that moment. This
+ * interface is part of the "hot" path and as such implementations should
respect that and should push expensive work
+ * outside the thread calling this.
+ */
+interface Collector {
+ fun collect(
+ ctx: StressContext,
+ op: Operation,
+ result: Either<AsyncResultSet, Throwable>,
+ startNanos: Long,
+ endNanos: Long,
+ )
+
+ fun close(context: Context) {
+ }
+}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/CompositeCollector.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/CompositeCollector.kt
new file mode 100644
index 0000000..5d0b212
--- /dev/null
+++
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/CompositeCollector.kt
@@ -0,0 +1,32 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+
+/**
+ * Groups a list of collectors together and will call each in the same order.
+ */
+class CompositeCollector(
+ private vararg val collectors: Collector,
+) : Collector {
+ override fun collect(
+ ctx: StressContext,
+ op: Operation,
+ result: Either<AsyncResultSet, Throwable>,
+ startNanos: Long,
+ endNanos: Long,
+ ) {
+ for (c in collectors) {
+ c.collect(ctx, op, result, startNanos, endNanos)
+ }
+ }
+
+ override fun close(context: Context) {
+ for (c in collectors) {
+ c.close(context)
+ }
+ }
+}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/HdrCollector.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/HdrCollector.kt
new file mode 100644
index 0000000..11d422b
--- /dev/null
+++
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/HdrCollector.kt
@@ -0,0 +1,63 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Either
+import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.workloads.Operation
+import org.HdrHistogram.SynchronizedHistogram
+import java.io.File
+import java.io.PrintStream
+
+/**
+ * Stores all events into a HdrHistorgram and publishes to 3 files at the end
of the run
+ */
+class HdrCollector(
+ val hdrHistogramPrefix: String,
+) : Collector {
+ // Using a synchronized histogram for now, we may need to change this
later if it's a perf bottleneck
+ val mutationHistogram = SynchronizedHistogram(2)
+ val selectHistogram = SynchronizedHistogram(2)
+ val deleteHistogram = SynchronizedHistogram(2)
+
+ override fun collect(
+ ctx: StressContext,
+ op: Operation,
+ result: Either<AsyncResultSet, Throwable>,
+ startNanos: Long,
+ endNanos: Long,
+ ) {
+ if (result is Either.Right) return // only success is tracked
+
+ // we log to the HDR histogram and do the callback for mutations
+ // might extend this to select, but I can't see a reason for it now
+ when (op) {
+ is Operation.Mutation, is Operation.DDL -> {
+ mutationHistogram.recordValue(endNanos - op.createdAtNanos)
+ }
+ is Operation.Deletion -> {
+ deleteHistogram.recordValue(endNanos - op.createdAtNanos)
+ }
+ is Operation.SelectStatement -> {
+ selectHistogram.recordValue(endNanos - op.createdAtNanos)
+ }
+ else -> {
+ // ignore
+ }
+ }
+ }
+
+ override fun close(ctx: Context) {
+ // print out the hdr histograms if requested to 3 separate files
+ val pairs =
+ listOf(
+ Pair(mutationHistogram, "mutations"),
+ Pair(selectHistogram, "reads"),
+ Pair(deleteHistogram, "deletes"),
+ )
+ for (entry in pairs) {
+ val fp = File(hdrHistogramPrefix + "-" + entry.second + ".txt")
+ entry.first.outputPercentileDistribution(PrintStream(fp),
1_000_000.0)
+ }
+ }
+}
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollector.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollector.kt
new file mode 100644
index 0000000..07732e8
--- /dev/null
+++
b/src/main/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollector.kt
@@ -0,0 +1,98 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet
+import com.google.common.base.Throwables
+import com.rustyrazorblade.easycassstress.Either
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.example.data.Group
+import org.apache.parquet.example.data.simple.SimpleGroupFactory
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.example.ExampleParquetWriter
+import org.apache.parquet.schema.MessageTypeParser
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+/**
+ * Publishes all the requests to a Apache Parquet file to allow analytical
tools to process the raw data rather than
+ * having to rely solely on metrics (which are sampled).
+ */
+class ParquetCollector(
+ fileOrDirectory: File,
+) : AsyncCollector(fileOrDirectory) {
+ override fun createWriter(fileOrDirectory: File): Writer =
+ ParquetTableWriter(if (fileOrDirectory.isDirectory)
File(fileOrDirectory, "rawlog.parquet") else fileOrDirectory)
+
+ class ParquetTableWriter(
+ file: File,
+ ) : Writer {
+ private val writer: ParquetWriter<Group>
+ private val groupFactory: SimpleGroupFactory
+
+ init {
+ file.delete()
+ val path = Path("file://${file.absolutePath}")
+ println("Parquet Log path $path")
+ val schema =
+ MessageTypeParser.parseMessageType(
+ "message example {\n" +
+ " required binary operation (UTF8);\n" +
+ " required boolean success;\n" +
+ " required binary failure_reason (UTF8);\n" +
+ " required binary failure_stacktrace (UTF8);\n" +
+ " required int64 service_start_time_ms;\n" +
+ " required int64 service_duration_ns;\n" +
+ " required int64 request_start_time_ms;\n" +
+ " required int64 request_duration_ns;\n" +
+ "}",
+ )
+ groupFactory = SimpleGroupFactory(schema)
+ writer =
ExampleParquetWriter.builder(path).withType(schema).build()
+ }
+
+ override fun write(event: Event) {
+ val requestStartNanos = event.op.createdAtNanos
+ val requestStartMillis = event.op.createdAtMillis
+ val requestDurationNanos = event.endNanos - requestStartNanos
+
+ val serviceStartNanos = event.startNanos
+ val serviceStartMillis = requestStartMillis +
TimeUnit.NANOSECONDS.toMillis(serviceStartNanos - requestStartNanos)
+ val serviceDurationNanos = event.endNanos - serviceStartNanos
+ val group =
+ groupFactory
+ .newGroup()
+ .append("operation", op(event))
+ .append("success", event.result is Either.Left)
+ .append("failure_reason", reasonName(event.result))
+ .append("failure_stacktrace",
reasonStackTrace(event.result))
+ .append("service_start_time_ms", serviceStartMillis)
+ .append("service_duration_ns", serviceDurationNanos)
+ .append("request_start_time_ms", requestStartMillis)
+ .append("request_duration_ns", requestDurationNanos)
+ writer.write(group)
+ }
+
+ override fun close() {
+ writer.close()
+ }
+ }
+
+ companion object {
+ private fun op(event: Event) =
+ event.op.javaClass.simpleName
+ .replace("Statement", "")
+
+ private fun reasonName(result: Either<AsyncResultSet, Throwable>) =
+ if (result is Either.Right) {
+ result.value.javaClass.simpleName
+ } else {
+ ""
+ }
+
+ private fun reasonStackTrace(result: Either<AsyncResultSet,
Throwable>) =
+ if (result is Either.Right) {
+ Throwables.getStackTraceAsString(result.value)
+ } else {
+ ""
+ }
+ }
+}
diff --git a/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
index 35c6fd1..d49594b 100644
--- a/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
+++ b/src/main/kotlin/com/rustyrazorblade/easycassstress/commands/Run.kt
@@ -29,6 +29,7 @@ import
com.datastax.oss.driver.api.core.config.DefaultDriverOption
import com.datastax.oss.driver.api.core.config.DriverConfigLoader
import com.google.common.base.Preconditions
import com.google.common.util.concurrent.RateLimiter
+import com.rustyrazorblade.easycassstress.Context
import com.rustyrazorblade.easycassstress.FileReporter
import com.rustyrazorblade.easycassstress.Metrics
import com.rustyrazorblade.easycassstress.Plugin
@@ -37,7 +38,10 @@ import com.rustyrazorblade.easycassstress.ProfileRunner
import com.rustyrazorblade.easycassstress.RateLimiterOptimizer
import com.rustyrazorblade.easycassstress.SchemaBuilder
import com.rustyrazorblade.easycassstress.SingleLineConsoleReporter
-import com.rustyrazorblade.easycassstress.StressContext
+import com.rustyrazorblade.easycassstress.collector.Collector
+import com.rustyrazorblade.easycassstress.collector.CompositeCollector
+import com.rustyrazorblade.easycassstress.collector.HdrCollector
+import com.rustyrazorblade.easycassstress.collector.ParquetCollector
import com.rustyrazorblade.easycassstress.converters.ConsistencyLevelConverter
import com.rustyrazorblade.easycassstress.converters.HumanReadableConverter
import com.rustyrazorblade.easycassstress.converters.HumanReadableTimeConverter
@@ -47,7 +51,6 @@ import me.tongfei.progressbar.ProgressBar
import me.tongfei.progressbar.ProgressBarStyle
import org.apache.logging.log4j.kotlin.logger
import java.io.File
-import java.io.PrintStream
import java.util.Timer
import kotlin.concurrent.fixedRateTimer
import kotlin.concurrent.schedule
@@ -56,9 +59,7 @@ import kotlin.concurrent.thread
val DEFAULT_ITERATIONS: Long = 1000000
class NoSplitter : IParameterSplitter {
- override fun split(value: String?): MutableList<String> {
- return mutableListOf(value!!)
- }
+ override fun split(value: String?): MutableList<String> =
mutableListOf(value!!)
}
/**
@@ -66,7 +67,9 @@ class NoSplitter : IParameterSplitter {
* It's used solely for logging and reporting purposes.
*/
@Parameters(commandDescription = "Run a cassandra-easy-stress profile")
-class Run(val command: String) : IStressCommand {
+class Run(
+ val command: String,
+) : IStressCommand {
@Parameter(names = ["--host"])
var host = System.getenv("CASSANDRA_EASY_STRESS_CASSANDRA_HOST") ?:
"127.0.0.1"
@@ -224,6 +227,14 @@ class Run(val command: String) : IStressCommand {
@Parameter(names = ["--csv"], description = "Write metrics to this file in
CSV format.")
var csvFile = ""
+ @Parameter(
+ names = ["--parquet"],
+ description =
+ "Capture client latency metrics to a Apache Parquet file at the
specified path. " +
+ "If the file is a directory, the file will be named
rawlog.parquet within that directory",
+ )
+ var parquetFile = ""
+
@Parameter(names = ["--paging"], description = "Override the driver's
default page size.")
var paging: Int? = null
@@ -278,7 +289,8 @@ class Run(val command: String) : IStressCommand {
val session by lazy {
// Build a programmatic config
var configLoaderBuilder =
- DriverConfigLoader.programmaticBuilder()
+ DriverConfigLoader
+ .programmaticBuilder()
// Default consistency levels
.withString(DefaultDriverOption.REQUEST_CONSISTENCY,
consistencyLevel.toString())
.withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY,
serialConsistencyLevel.toString())
@@ -309,14 +321,18 @@ class Run(val command: String) : IStressCommand {
// Build the CqlSession
val sessionBuilder =
- CqlSession.builder()
+ CqlSession
+ .builder()
.addContactPoint(java.net.InetSocketAddress(host, cqlPort))
.withAuthCredentials(username, password)
.withConfigLoader(configLoaderBuilder.build())
// Add SSL if needed
if (ssl) {
-
sessionBuilder.withSslContext(javax.net.ssl.SSLContext.getDefault())
+ sessionBuilder.withSslContext(
+ javax.net.ssl.SSLContext
+ .getDefault(),
+ )
}
// Show settings about to be used
@@ -405,9 +421,12 @@ class Run(val command: String) : IStressCommand {
var runnersExecuted = 0L
+ val collector = createCollector()
+ val context = Context(session, this, metrics, fieldRegistry,
rateLimiter, collector)
+
try {
// run the prepare for each
- val runners = createRunners(plugin, metrics, fieldRegistry,
rateLimiter)
+ val runners = createRunners(plugin, context)
populateData(plugin, runners, metrics)
metrics.resetErrors()
@@ -443,20 +462,6 @@ class Run(val command: String) : IStressCommand {
for (reporter in metrics.reporters) {
reporter.report()
}
-
- // print out the hdr histograms if requested to 3 separate files
- if (hdrHistogramPrefix != "") {
- val pairs =
- listOf(
- Pair(metrics.mutationHistogram, "mutations"),
- Pair(metrics.selectHistogram, "reads"),
- Pair(metrics.deleteHistogram, "deletes"),
- )
- for (entry in pairs) {
- val fp = File(hdrHistogramPrefix + "-" + entry.second +
".txt")
- entry.first.outputPercentileDistribution(PrintStream(fp),
1_000_000.0)
- }
- }
} catch (e: Exception) {
println(
"There was an error with cassandra-easy-stress. Please file a
bug at " +
@@ -467,12 +472,28 @@ class Run(val command: String) : IStressCommand {
// we need to be able to run multiple tests in the same JVM
// without this cleanup we could have the metrics runner still
running and it will cause subsequent tests to fail
metrics.shutdown()
+ collector.close(context)
Thread.sleep(1000)
println("Stress complete, $runnersExecuted.")
}
}
+ private fun createCollector(): Collector {
+ val collectors = ArrayList<Collector>()
+
+ if (hdrHistogramPrefix != "") {
+ collectors.add(HdrCollector(hdrHistogramPrefix))
+ }
+ if (parquetFile != "") {
+ collectors.add(ParquetCollector(File(parquetFile)))
+ }
+ if (collectors.size == 1) {
+ return collectors[0]
+ }
+ return CompositeCollector(*collectors.toTypedArray())
+ }
+
private fun getRateLimiter(): RateLimiter {
val tmp = RateLimiter.create(rate.toDouble())
tmp.acquire(rate.toInt())
@@ -548,23 +569,23 @@ class Run(val command: String) : IStressCommand {
private fun createRunners(
plugin: Plugin,
- metrics: Metrics,
- fieldRegistry: Registry,
- rateLimiter: RateLimiter?,
+ sharedContext: Context,
): List<ProfileRunner> {
val runners =
IntRange(0, threads - 1).map {
// println("Connecting")
println("Connecting to Cassandra cluster ...")
- val context = StressContext(session, this, it, metrics,
fieldRegistry, rateLimiter)
+ val context = sharedContext.stress(it)
ProfileRunner.create(context, plugin.instance)
}
val executed =
- runners.parallelStream().map {
- println("Preparing statements.")
- it.prepare()
- }.count()
+ runners
+ .parallelStream()
+ .map {
+ println("Preparing statements.")
+ it.prepare()
+ }.count()
println("$executed threads prepared.")
return runners
@@ -646,7 +667,8 @@ class Run(val command: String) : IStressCommand {
for (statement in plugin.instance.schema()) {
val s =
- SchemaBuilder.create(statement)
+ SchemaBuilder
+ .create(statement)
.withCompaction(compaction)
.withCompression(compression)
.withRowCache(rowCache)
diff --git
a/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
b/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
index e1328e3..039bc25 100644
---
a/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
+++
b/src/main/kotlin/com/rustyrazorblade/easycassstress/workloads/IStressProfile.kt
@@ -17,7 +17,6 @@
*/
package com.rustyrazorblade.easycassstress.workloads
-import com.codahale.metrics.Timer.Context
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.datastax.oss.driver.api.core.cql.BoundStatement
@@ -42,9 +41,7 @@ interface IStressRunner {
* However, certain workloads may need custom setup.
* @see Locking
**/
- fun getNextPopulate(partitionKey: PartitionKey): Operation {
- return getNextMutation(partitionKey)
- }
+ fun getNextPopulate(partitionKey: PartitionKey): Operation =
getNextMutation(partitionKey)
/**
* Callback after a query executes successfully.
@@ -113,9 +110,7 @@ interface IStressProfile {
*/
fun getFieldGenerators(): Map<Field, FieldGenerator> = mapOf()
- fun getDefaultReadRate(): Double {
- return .01
- }
+ fun getDefaultReadRate(): Double = .01
fun getPopulateOption(args: Run): PopulateOption =
PopulateOption.Standard()
@@ -126,18 +121,25 @@ sealed class Operation(
val bound: BoundStatement? = null,
val statement: String? = null,
) {
- // we're going to track metrics on the mutations differently
- // inserts will also carry data that might be saved for later validation
- // clustering keys won't be realistic to compute in the framework
- lateinit var startTime: Context
+ val createdAtNanos = System.nanoTime()
+ val createdAtMillis = System.currentTimeMillis()
- class Mutation(bound: BoundStatement, val callbackPayload: Any? = null) :
Operation(bound)
+ class Mutation(
+ bound: BoundStatement,
+ val callbackPayload: Any? = null,
+ ) : Operation(bound)
- class SelectStatement(bound: BoundStatement) : Operation(bound)
+ class SelectStatement(
+ bound: BoundStatement,
+ ) : Operation(bound)
- class Deletion(bound: BoundStatement) : Operation(bound)
+ class Deletion(
+ bound: BoundStatement,
+ ) : Operation(bound)
class Stop : Operation(null)
- class DDL(statement: String) : Operation(null, statement = statement)
+ class DDL(
+ statement: String,
+ ) : Operation(null, statement = statement)
}
diff --git
a/src/test/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollectorTest.kt
b/src/test/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollectorTest.kt
new file mode 100644
index 0000000..0801d81
--- /dev/null
+++
b/src/test/kotlin/com/rustyrazorblade/easycassstress/collector/ParquetCollectorTest.kt
@@ -0,0 +1,65 @@
+package com.rustyrazorblade.easycassstress.collector
+
+import com.datastax.oss.driver.api.core.CqlSession
+import com.google.common.io.Files
+import com.rustyrazorblade.easycassstress.Context
+import com.rustyrazorblade.easycassstress.Metrics
+import com.rustyrazorblade.easycassstress.commands.Run
+import com.rustyrazorblade.easycassstress.generators.Registry
+import io.mockk.mockk
+import org.assertj.core.api.Assertions
+import org.assertj.core.api.Condition
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.io.TempDir
+import java.io.File
+import java.nio.file.Path
+
+internal class ParquetCollectorTest {
+ @TempDir
+ var tempDir: Path? = null
+
+ @Test
+ fun nonExistingDirectory() {
+ val fileOrDirectory =
tempDir!!.resolve("doesnotexist").resolve("noreallydoesnotexist").toFile()
+ val c = ParquetCollector(fileOrDirectory)
+ c.close(ctx)
+
+ Assertions.assertThat(fileOrDirectory).exists().isFile
+ }
+
+ @Test
+ fun existingDirectory() {
+ val expected = tempDir!!.resolve("rawlog.parquet").toFile()
+ val c = ParquetCollector(tempDir!!.toFile())
+ c.close(ctx)
+
+ Assertions.assertThat(expected).exists().isFile
+ }
+
+ @Test
+ fun existingFile() {
+ val expected = tempDir!!.resolve("rawlog.parquet").toFile()
+ val unexpectedBytes = "some text".toByteArray()
+ Files.write(unexpectedBytes, expected)
+ val c = ParquetCollector(expected)
+ c.close(ctx)
+
+ Assertions.assertThat(expected).exists().isFile.doesNotHave(
+ object : Condition<File>() {
+ override fun matches(value: File?): Boolean =
Files.toByteArray(value).equals(unexpectedBytes)
+ },
+ )
+ }
+
+ companion object {
+ val ctx =
+ Context(
+ mockk<CqlSession>(),
+ mockk<Run>(),
+ mockk<Metrics>(),
+ mockk<Registry>(),
+ null,
+ mockk<Collector>(),
+ )
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]