This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8955124219c Add BigTableIO Stress test (#30630) 8955124219c is described below commit 8955124219cf9bd6dcea74f22c70fed7940da2b8 Author: akashorabek <70029317+akashora...@users.noreply.github.com> AuthorDate: Thu Mar 21 06:07:57 2024 +0600 Add BigTableIO Stress test (#30630) * Add BigTableIO Stress test * refactor * update dependency tree * refactor * Add stress test files to grpc/protobuff exception ignore list * move exportMetrics method to IOLoadTestBase class * refactor * refactor --- it/google-cloud-platform/build.gradle | 5 +- .../org/apache/beam/it/gcp/IOLoadTestBase.java | 34 ++ .../org/apache/beam/it/gcp/IOStressTestBase.java | 123 +++++++ .../apache/beam/it/gcp/bigquery/BigQueryIOST.java | 105 +----- .../apache/beam/it/gcp/bigtable/BigTableIOST.java | 389 +++++++++++++++++++++ it/kafka/build.gradle | 4 +- .../java/org/apache/beam/it/kafka/KafkaIOST.java | 156 ++------- .../resources/beam/checkstyle/suppressions.xml | 1 + 8 files changed, 578 insertions(+), 239 deletions(-) diff --git a/it/google-cloud-platform/build.gradle b/it/google-cloud-platform/build.gradle index 9717b5f8c84..3353a9692cb 100644 --- a/it/google-cloud-platform/build.gradle +++ b/it/google-cloud-platform/build.gradle @@ -32,6 +32,7 @@ dependencies { implementation project(path: ":runners:google-cloud-dataflow-java") implementation project(path: ":it:conditions", configuration: "shadow") implementation project(path: ":it:truthmatchers", configuration: "shadow") + implementation project(path: ":sdks:java:testing:test-utils") implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre implementation library.java.jackson_core @@ -48,6 +49,7 @@ dependencies { implementation library.java.protobuf_java implementation library.java.threetenbp implementation 'org.awaitility:awaitility:4.2.0' + implementation 'joda-time:joda-time:2.10.10' // Google Cloud Dependencies implementation library.java.google_api_services_bigquery implementation library.java.google_cloud_core @@ -71,7 +73,6 @@ dependencies { implementation 'com.google.cloud:google-cloud-secretmanager' provided 'com.google.api.grpc:proto-google-cloud-secretmanager-v1' - testImplementation project(path: ":sdks:java:testing:test-utils") testImplementation project(path: ":sdks:java:io:google-cloud-platform") testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:io:synthetic") @@ -83,6 +84,8 @@ dependencies { tasks.register("GCSPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'FileBasedIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("BigTablePerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) +tasks.register("BigTableStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) +tasks.register("BigTableStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("BigQueryPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOLT', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("BigQueryStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) tasks.register("BigQueryStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java index 6b728a6a60d..e5f20c07c01 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOLoadTestBase.java @@ -17,12 +17,21 @@ */ package org.apache.beam.it.gcp; +import com.google.cloud.Timestamp; import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.TestProperties; import org.apache.beam.it.gcp.dataflow.DefaultPipelineLauncher; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; import org.apache.beam.sdk.transforms.DoFn; import org.junit.After; import org.junit.Before; @@ -101,4 +110,29 @@ public class IOLoadTestBase extends LoadTestBase { public static String getBeamMetricsName(PipelineMetricsType metricstype, String metricsName) { return BEAM_METRICS_NAMESPACE + ":" + metricstype + ":" + metricsName; } + + /** Exports test metrics to InfluxDB or BigQuery depending on the configuration. */ + protected void exportMetrics( + PipelineLauncher.LaunchInfo launchInfo, + MetricsConfiguration metricsConfig, + boolean exportToInfluxDB, + InfluxDBSettings influxDBSettings) + throws IOException, ParseException, InterruptedException { + + Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig); + String testId = UUID.randomUUID().toString(); + String testTimestamp = Timestamp.now().toString(); + + if (exportToInfluxDB) { + Collection<NamedTestResult> namedTestResults = new ArrayList<>(); + for (Map.Entry<String, Double> entry : metrics.entrySet()) { + NamedTestResult metricResult = + NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue()); + namedTestResults.add(metricResult); + } + IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings); + } else { + exportMetricsToBigQuery(launchInfo, metrics); + } + } } diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java new file mode 100644 index 00000000000..5c2fb74cd2f --- /dev/null +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/IOStressTestBase.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp; + +import java.io.Serializable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.joda.time.Instant; + +/** Base class for IO Stress tests. */ +public class IOStressTestBase extends IOLoadTestBase { + /** + * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and + * eventually return to 1x. + */ + protected static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1}; + + protected static final int DEFAULT_ROWS_PER_SECOND = 1000; + + /** + * Generates and returns a list of LoadPeriod instances representing periods of load increase + * based on the specified load increase array and total duration in minutes. + * + * @param minutesTotal The total duration in minutes for which the load periods are generated. + * @return A list of LoadPeriod instances defining periods of load increase. + */ + protected List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) { + + List<LoadPeriod> loadPeriods = new ArrayList<>(); + long periodDurationMillis = + Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis(); + long startTimeMillis = 0; + + for (int loadIncreaseMultiplier : loadIncreaseArray) { + long endTimeMillis = startTimeMillis + periodDurationMillis; + loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis)); + + startTimeMillis = endTimeMillis; + } + return loadPeriods; + } + + /** + * Represents a period of time with associated load increase properties for stress testing + * scenarios. + */ + protected static class LoadPeriod implements Serializable { + private final int loadIncreaseMultiplier; + private final long periodStartMillis; + private final long periodEndMillis; + + public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) { + this.loadIncreaseMultiplier = loadIncreaseMultiplier; + this.periodStartMillis = periodStartMillis; + this.periodEndMillis = periodEndMin; + } + + public int getLoadIncreaseMultiplier() { + return loadIncreaseMultiplier; + } + + public long getPeriodStartMillis() { + return periodStartMillis; + } + + public long getPeriodEndMillis() { + return periodEndMillis; + } + } + + /** + * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic + * load increase over time, multiplying the input elements based on the elapsed time since the + * start of processing. This class aims to simulate various load levels during stress testing. + */ + protected static class MultiplierDoFn<T> extends DoFn<T, T> { + private final int startMultiplier; + private final long startTimesMillis; + private final List<LoadPeriod> loadPeriods; + + public MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) { + this.startMultiplier = startMultiplier; + this.startTimesMillis = Instant.now().getMillis(); + this.loadPeriods = loadPeriods; + } + + @DoFn.ProcessElement + public void processElement( + @Element T element, OutputReceiver<T> outputReceiver, @DoFn.Timestamp Instant timestamp) { + + int multiplier = this.startMultiplier; + long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis; + + for (LoadPeriod loadPeriod : loadPeriods) { + if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis() + && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) { + multiplier *= loadPeriod.getLoadIncreaseMultiplier(); + break; + } + } + for (int i = 0; i < multiplier; i++) { + outputReceiver.output(element); + } + } + } +} diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java index 6ffe1014c8a..d0eed457b19 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOST.java @@ -26,7 +26,6 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.Timestamp; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.text.ParseException; import java.time.Duration; @@ -46,7 +45,7 @@ import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineOperator; import org.apache.beam.it.common.TestProperties; import org.apache.beam.it.common.utils.ResourceManagerUtils; -import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.it.gcp.IOStressTestBase; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest; @@ -59,7 +58,6 @@ import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.testutils.NamedTestResult; import org.apache.beam.sdk.testutils.metrics.IOITMetrics; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.PeriodicImpulse; @@ -70,7 +68,6 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; -import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -87,18 +84,11 @@ import org.junit.Test; * - To run large-scale stress tests: {@code gradle * :it:google-cloud-platform:BigQueryStressTestLarge} */ -public final class BigQueryIOST extends IOLoadTestBase { +public final class BigQueryIOST extends IOStressTestBase { private static final String READ_ELEMENT_METRIC_NAME = "read_count"; private static final String TEST_ID = UUID.randomUUID().toString(); private static final String TEST_TIMESTAMP = Timestamp.now().toString(); - private static final int DEFAULT_ROWS_PER_SECOND = 1000; - - /** - * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and - * eventually return to 1x. - */ - private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1}; private static BigQueryResourceManager resourceManager; private static String tableQualifier; @@ -301,7 +291,7 @@ public final class BigQueryIOST extends IOLoadTestBase { source .apply( "One input to multiple outputs", - ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods))) + ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods))) .apply("Reshuffle fanout", Reshuffle.viaRandomKey()) .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME))); } @@ -371,44 +361,6 @@ public final class BigQueryIOST extends IOLoadTestBase { } } - /** - * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic - * load increase over time, multiplying the input elements based on the elapsed time since the - * start of processing. This class aims to simulate various load levels during stress testing. - */ - private static class MultiplierDoFn extends DoFn<byte[], byte[]> { - private final int startMultiplier; - private final long startTimesMillis; - private final List<LoadPeriod> loadPeriods; - - MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) { - this.startMultiplier = startMultiplier; - this.startTimesMillis = Instant.now().getMillis(); - this.loadPeriods = loadPeriods; - } - - @ProcessElement - public void processElement( - @Element byte[] element, - OutputReceiver<byte[]> outputReceiver, - @DoFn.Timestamp Instant timestamp) { - - int multiplier = this.startMultiplier; - long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis; - - for (LoadPeriod loadPeriod : loadPeriods) { - if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis() - && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) { - multiplier *= loadPeriod.getLoadIncreaseMultiplier(); - break; - } - } - for (int i = 0; i < multiplier; i++) { - outputReceiver.output(element); - } - } - } - abstract static class FormatFn<InputT, OutputT> implements SerializableFunction<InputT, OutputT> { protected final int numColumns; @@ -493,29 +445,6 @@ public final class BigQueryIOST extends IOLoadTestBase { } } - /** - * Generates and returns a list of LoadPeriod instances representing periods of load increase - * based on the specified load increase array and total duration in minutes. - * - * @param minutesTotal The total duration in minutes for which the load periods are generated. - * @return A list of LoadPeriod instances defining periods of load increase. - */ - private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) { - - List<LoadPeriod> loadPeriods = new ArrayList<>(); - long periodDurationMillis = - Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis(); - long startTimeMillis = 0; - - for (int loadIncreaseMultiplier : loadIncreaseArray) { - long endTimeMillis = startTimeMillis + periodDurationMillis; - loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis)); - - startTimeMillis = endTimeMillis; - } - return loadPeriods; - } - private enum WriteFormat { AVRO, JSON @@ -573,32 +502,4 @@ public final class BigQueryIOST extends IOLoadTestBase { /** InfluxDB database to publish metrics. * */ @JsonProperty public String influxDatabase; } - - /** - * Represents a period of time with associated load increase properties for stress testing - * scenarios. - */ - private static class LoadPeriod implements Serializable { - private final int loadIncreaseMultiplier; - private final long periodStartMillis; - private final long periodEndMillis; - - public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) { - this.loadIncreaseMultiplier = loadIncreaseMultiplier; - this.periodStartMillis = periodStartMillis; - this.periodEndMillis = periodEndMin; - } - - public int getLoadIncreaseMultiplier() { - return loadIncreaseMultiplier; - } - - public long getPeriodStartMillis() { - return periodStartMillis; - } - - public long getPeriodEndMillis() { - return periodEndMillis; - } - } } diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java new file mode 100644 index 00000000000..4821992381b --- /dev/null +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.it.gcp.bigtable; + +import static org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils.generateTableId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.bigtable.v2.Mutation; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.Serializable; +import java.text.ParseException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.IOStressTestBase; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; +import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +/** + * BigTableIO stress test. The test is designed to assess the performance of BigTableIO under + * various conditions. + * + * <p>Usage: <br> + * - To run medium-scale stress tests: {@code gradle + * :it:google-cloud-platform:BigTableStressTestMedium} - To run large-scale stress tests: {@code + * gradle :it:google-cloud-platform:BigTableStressTestLarge} + */ +public final class BigTableIOST extends IOStressTestBase { + + private static final String WRITE_ELEMENT_METRIC_NAME = "write_count"; + private static final String READ_ELEMENT_METRIC_NAME = "read_count"; + private static final String COLUMN_FAMILY_NAME = "cf"; + private static final long TABLE_MAX_AGE_MINUTES = 800L; + + private BigtableResourceManager resourceManager; + private InfluxDBSettings influxDBSettings; + private Configuration configuration; + private String testConfigName; + private String tableId; + + @Rule public TestPipeline writePipeline = TestPipeline.create(); + @Rule public TestPipeline readPipeline = TestPipeline.create(); + + @Before + public void setup() throws IOException { + resourceManager = + BigtableResourceManager.builder(testName, project, CREDENTIALS_PROVIDER).build(); + + // create table + tableId = generateTableId(testName); + resourceManager.createTable( + tableId, + ImmutableList.of(COLUMN_FAMILY_NAME), + org.threeten.bp.Duration.ofMinutes(TABLE_MAX_AGE_MINUTES)); + + // parse configuration + testConfigName = + TestProperties.getProperty("configuration", "medium", TestProperties.Type.PROPERTY); + configuration = TEST_CONFIGS_PRESET.get(testConfigName); + if (configuration == null) { + try { + configuration = Configuration.fromJsonString(testConfigName, Configuration.class); + } catch (IOException e) { + throw new IllegalArgumentException( + String.format( + "Unknown test configuration: [%s]. Pass to a valid configuration json, or use" + + " config presets: %s", + testConfigName, TEST_CONFIGS_PRESET.keySet())); + } + } + + // tempLocation needs to be set for DataflowRunner + if (!Strings.isNullOrEmpty(tempBucketName)) { + String tempLocation = String.format("gs://%s/temp/", tempBucketName); + writePipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation); + writePipeline.getOptions().setTempLocation(tempLocation); + readPipeline.getOptions().as(TestPipelineOptions.class).setTempRoot(tempLocation); + readPipeline.getOptions().setTempLocation(tempLocation); + } + // Use streaming pipeline to write records + writePipeline.getOptions().as(StreamingOptions.class).setStreaming(true); + } + + @After + public void teardown() { + ResourceManagerUtils.cleanResources(resourceManager); + } + + private static final Map<String, Configuration> TEST_CONFIGS_PRESET; + + static { + try { + TEST_CONFIGS_PRESET = + ImmutableMap.of( + "medium", + Configuration.fromJsonString( + "{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}", + Configuration.class), + "large", + Configuration.fromJsonString( + "{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}", + Configuration.class)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Run stress test with configurations specified by TestProperties. */ + @Test + public void runTest() throws IOException, ParseException, InterruptedException { + if (configuration.exportMetricsToInfluxDB) { + influxDBSettings = + InfluxDBSettings.builder() + .withHost(configuration.influxHost) + .withDatabase(configuration.influxDatabase) + .withMeasurement(configuration.influxMeasurement + "_" + testConfigName) + .get(); + } + + PipelineLauncher.LaunchInfo writeInfo = generateDataAndWrite(); + PipelineOperator.Result writeResult = + pipelineOperator.waitUntilDone( + createConfig(writeInfo, Duration.ofMinutes(configuration.pipelineTimeout))); + assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, writeResult); + + PipelineLauncher.LaunchInfo readInfo = readData(); + PipelineOperator.Result readResult = + pipelineOperator.waitUntilDone( + createConfig(readInfo, Duration.ofMinutes(configuration.pipelineTimeout))); + assertNotEquals(PipelineOperator.Result.LAUNCH_FAILED, readResult); + + try { + double writeNumRecords = + pipelineLauncher.getMetric( + project, + region, + writeInfo.jobId(), + getBeamMetricsName(PipelineMetricsType.COUNTER, WRITE_ELEMENT_METRIC_NAME)); + + double readNumRecords = + pipelineLauncher.getMetric( + project, + region, + readInfo.jobId(), + getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)); + + assertEquals(writeNumRecords, readNumRecords, 0); + } finally { + // clean up write streaming pipeline + if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId()) + == PipelineLauncher.JobState.RUNNING) { + pipelineLauncher.cancelJob(project, region, writeInfo.jobId()); + } + } + + // export metrics + MetricsConfiguration writeMetricsConfig = + MetricsConfiguration.builder() + .setOutputPCollection("Counting element.out0") + .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") + .build(); + + MetricsConfiguration readMetricsConfig = + MetricsConfiguration.builder() + .setOutputPCollection("Counting element.out0") + .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") + .build(); + + exportMetrics( + writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings); + exportMetrics( + readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings); + } + + /** + * The method creates a pipeline to simulate data generation and write operations to BigTable, + * based on the specified configuration parameters. The stress test involves varying the load + * dynamically over time, with options to use configurable parameters. + */ + private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException { + // The PeriodicImpulse source will generate an element every this many millis: + int fireInterval = 1; + // Each element from PeriodicImpulse will fan out to this many elements: + int startMultiplier = + Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / DEFAULT_ROWS_PER_SECOND; + long stopAfterMillis = + org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis(); + long totalRows = startMultiplier * stopAfterMillis / fireInterval; + List<LoadPeriod> loadPeriods = + getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY); + + PCollection<org.joda.time.Instant> source = + writePipeline.apply( + PeriodicImpulse.create() + .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1)) + .withInterval(org.joda.time.Duration.millis(fireInterval))); + if (startMultiplier > 1) { + source = + source + .apply( + "One input to multiple outputs", + ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods))) + .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME))); + } + source + .apply( + "Map records to BigTable format", + ParDo.of(new MapToBigTableFormat((int) configuration.valueSizeBytes, (int) totalRows))) + .apply( + "Write to BigTable", + BigtableIO.write() + .withProjectId(project) + .withInstanceId(resourceManager.getInstanceId()) + .withTableId(tableId)); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder("write-bigtable") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(writePipeline) + .addParameter("runner", configuration.runner) + .addParameter( + "autoscalingAlgorithm", + DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED + .toString()) + .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) + .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers)) + .addParameter("experiments", "use_runner_v2") + .build(); + + return pipelineLauncher.launch(project, region, options); + } + + /** The method reads data from BigTable in batch mode. */ + private PipelineLauncher.LaunchInfo readData() throws IOException { + BigtableIO.Read readIO = + BigtableIO.read() + .withoutValidation() + .withProjectId(project) + .withInstanceId(resourceManager.getInstanceId()) + .withTableId(tableId); + + readPipeline + .apply("Read from BigTable", readIO) + .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME))); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder("read-bigtable") + .setSdk(PipelineLauncher.Sdk.JAVA) + .setPipeline(readPipeline) + .addParameter("runner", configuration.runner) + .addParameter( + "autoscalingAlgorithm", + DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED + .toString()) + .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) + .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers)) + .build(); + + return pipelineLauncher.launch(project, region, options); + } + + /** Options for BigTableIO stress test. */ + static class Configuration extends SyntheticSourceOptions { + /** Pipeline timeout in minutes. Must be a positive value. */ + @JsonProperty public int pipelineTimeout = 20; + + /** Runner specified to run the pipeline. */ + @JsonProperty public String runner = "DirectRunner"; + + /** Number of workers for the pipeline. */ + @JsonProperty public int numWorkers = 20; + + /** Maximum number of workers for the pipeline. */ + @JsonProperty public int maxNumWorkers = 100; + + /** + * Rate of generated elements sent to the source table. Will run with a minimum of 1k rows per + * second. + */ + @JsonProperty public int rowsPerSecond = DEFAULT_ROWS_PER_SECOND; + + /** Rows will be generated for this many minutes. */ + @JsonProperty public int minutes = 15; + + /** + * Determines the destination for exporting metrics. If set to true, metrics will be exported to + * InfluxDB and displayed using Grafana. If set to false, metrics will be exported to BigQuery + * and displayed with Looker Studio. + */ + @JsonProperty public boolean exportMetricsToInfluxDB = false; + + /** InfluxDB measurement to publish results to. * */ + @JsonProperty public String influxMeasurement = BigTableIOST.class.getName(); + + /** InfluxDB host to publish metrics. * */ + @JsonProperty public String influxHost; + + /** InfluxDB database to publish metrics. * */ + @JsonProperty public String influxDatabase; + } + + /** Maps Instant to the BigTable format record. */ + private static class MapToBigTableFormat + extends DoFn<org.joda.time.Instant, KV<ByteString, Iterable<Mutation>>> + implements Serializable { + + private final int valueSizeBytes; + private final int totalRows; + + public MapToBigTableFormat(int valueSizeBytes, int totalRows) { + this.valueSizeBytes = valueSizeBytes; + this.totalRows = totalRows; + } + + @ProcessElement + public void processElement(ProcessContext c) { + long index = Objects.requireNonNull(c.element()).getMillis() % totalRows; + + ByteString key = + ByteString.copyFromUtf8( + String.format( + "key%s", + index + + "-" + + UUID.randomUUID() + + "-" + + UUID.randomUUID() + + "-" + + org.joda.time.Instant.now().getMillis())); + Random random = new Random(index); + byte[] valBytes = new byte[this.valueSizeBytes]; + random.nextBytes(valBytes); + ByteString value = ByteString.copyFrom(valBytes); + + Iterable<Mutation> mutations = + ImmutableList.of( + Mutation.newBuilder() + .setSetCell( + Mutation.SetCell.newBuilder() + .setValue(value) + .setTimestampMicros(java.time.Instant.now().toEpochMilli() * 1000L) + .setFamilyName(COLUMN_FAMILY_NAME)) + .build()); + c.output(KV.of(key, mutations)); + } + } +} diff --git a/it/kafka/build.gradle b/it/kafka/build.gradle index b1b8147e72a..96f915a1d84 100644 --- a/it/kafka/build.gradle +++ b/it/kafka/build.gradle @@ -46,5 +46,5 @@ dependencies { testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration") } -tasks.register("KafkaStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) -tasks.register("KafkaStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) +tasks.register("KafkaStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'medium','bootstrapServers':System.getProperty("bootstrapServers"),'useDataflowRunnerV2':System.getProperty("useDataflowRunnerV2"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) +tasks.register("KafkaStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'kafka', 'KafkaIOST', ['configuration':'large','bootstrapServers':System.getProperty("bootstrapServers"),'useDataflowRunnerV2':System.getProperty("useDataflowRunnerV2"),'project':'apache-beam-testing', 'artifactBucket':'io-performance-temp']) diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java index 2a830400a0a..4ca34328637 100644 --- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java +++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java @@ -21,15 +21,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.cloud.Timestamp; import java.io.IOException; -import java.io.Serializable; import java.text.ParseException; import java.time.Duration; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -37,17 +33,14 @@ import java.util.UUID; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineOperator; import org.apache.beam.it.common.TestProperties; -import org.apache.beam.it.gcp.IOLoadTestBase; +import org.apache.beam.it.gcp.IOStressTestBase; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; -import org.apache.beam.sdk.testutils.NamedTestResult; -import org.apache.beam.sdk.testutils.metrics.IOITMetrics; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.PeriodicImpulse; @@ -61,7 +54,6 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -81,17 +73,10 @@ import org.junit.Test; * - To run large-scale stress tests: {@code gradle :it:kafka:KafkaStressTestLarge * -DbootstrapServers="0.0.0.0:32400,1.1.1.1:32400"} */ -public final class KafkaIOST extends IOLoadTestBase { - /** - * The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and - * eventually return to 1x. - */ - private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1}; - - private static InfluxDBSettings influxDBSettings; +public final class KafkaIOST extends IOStressTestBase { private static final String WRITE_ELEMENT_METRIC_NAME = "write_count"; private static final String READ_ELEMENT_METRIC_NAME = "read_count"; - private static final int DEFAULT_ROWS_PER_SECOND = 1000; + private static InfluxDBSettings influxDBSettings; private Configuration configuration; private AdminClient adminClient; private String testConfigName; @@ -120,6 +105,11 @@ public final class KafkaIOST extends IOLoadTestBase { } configuration.bootstrapServers = TestProperties.getProperty("bootstrapServers", null, TestProperties.Type.PROPERTY); + String useDataflowRunnerV2FromProps = + TestProperties.getProperty("useDataflowRunnerV2", "true", TestProperties.Type.PROPERTY); + if (!useDataflowRunnerV2FromProps.isEmpty()) { + configuration.useDataflowRunnerV2 = Boolean.parseBoolean(useDataflowRunnerV2FromProps); + } adminClient = AdminClient.create(ImmutableMap.of("bootstrap.servers", configuration.bootstrapServers)); @@ -231,8 +221,10 @@ public final class KafkaIOST extends IOLoadTestBase { .setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0") .build(); - exportMetrics(writeInfo, writeMetricsConfig); - exportMetrics(readInfo, readMetricsConfig); + exportMetrics( + writeInfo, writeMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings); + exportMetrics( + readInfo, readMetricsConfig, configuration.exportMetricsToInfluxDB, influxDBSettings); } /** @@ -267,7 +259,7 @@ public final class KafkaIOST extends IOLoadTestBase { source .apply( "One input to multiple outputs", - ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods))) + ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods))) .apply("Reshuffle fanout", Reshuffle.viaRandomKey()) .apply("Counting element", ParDo.of(new CountingFn<>(WRITE_ELEMENT_METRIC_NAME))); } @@ -295,7 +287,7 @@ public final class KafkaIOST extends IOLoadTestBase { .toString()) .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) .addParameter("maxNumWorkers", String.valueOf(configuration.maxNumWorkers)) - .addParameter("experiments", "use_runner_v2") + .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "") .build(); return pipelineLauncher.launch(project, region, options); @@ -310,7 +302,7 @@ public final class KafkaIOST extends IOLoadTestBase { .withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", "earliest")); readPipeline - .apply("Read from unbounded Kafka", readFromKafka) + .apply("Read from Kafka", readFromKafka) .apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME))); PipelineLauncher.LaunchConfig options = @@ -319,94 +311,12 @@ public final class KafkaIOST extends IOLoadTestBase { .setPipeline(readPipeline) .addParameter("numWorkers", String.valueOf(configuration.numWorkers)) .addParameter("runner", configuration.runner) - .addParameter("experiments", "use_runner_v2") + .addParameter("experiments", configuration.useDataflowRunnerV2 ? "use_runner_v2" : "") .build(); return pipelineLauncher.launch(project, region, options); } - private void exportMetrics( - PipelineLauncher.LaunchInfo launchInfo, MetricsConfiguration metricsConfig) - throws IOException, ParseException, InterruptedException { - - Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig); - String testId = UUID.randomUUID().toString(); - String testTimestamp = Timestamp.now().toString(); - - if (configuration.exportMetricsToInfluxDB) { - Collection<NamedTestResult> namedTestResults = new ArrayList<>(); - for (Map.Entry<String, Double> entry : metrics.entrySet()) { - NamedTestResult metricResult = - NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue()); - namedTestResults.add(metricResult); - } - IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings); - } else { - exportMetricsToBigQuery(launchInfo, metrics); - } - } - - /** - * Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic - * load increase over time, multiplying the input elements based on the elapsed time since the - * start of processing. This class aims to simulate various load levels during stress testing. - */ - private static class MultiplierDoFn extends DoFn<byte[], byte[]> { - private final int startMultiplier; - private final long startTimesMillis; - private final List<LoadPeriod> loadPeriods; - - MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) { - this.startMultiplier = startMultiplier; - this.startTimesMillis = Instant.now().getMillis(); - this.loadPeriods = loadPeriods; - } - - @DoFn.ProcessElement - public void processElement( - @Element byte[] element, - OutputReceiver<byte[]> outputReceiver, - @DoFn.Timestamp Instant timestamp) { - - int multiplier = this.startMultiplier; - long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis; - - for (LoadPeriod loadPeriod : loadPeriods) { - if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis() - && elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) { - multiplier *= loadPeriod.getLoadIncreaseMultiplier(); - break; - } - } - for (int i = 0; i < multiplier; i++) { - outputReceiver.output(element); - } - } - } - - /** - * Generates and returns a list of LoadPeriod instances representing periods of load increase - * based on the specified load increase array and total duration in minutes. - * - * @param minutesTotal The total duration in minutes for which the load periods are generated. - * @return A list of LoadPeriod instances defining periods of load increase. - */ - private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) { - - List<LoadPeriod> loadPeriods = new ArrayList<>(); - long periodDurationMillis = - Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis(); - long startTimeMillis = 0; - - for (int loadIncreaseMultiplier : loadIncreaseArray) { - long endTimeMillis = startTimeMillis + periodDurationMillis; - loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis)); - - startTimeMillis = endTimeMillis; - } - return loadPeriods; - } - /** Options for Kafka IO stress test. */ static class Configuration extends SyntheticSourceOptions { /** Pipeline timeout in minutes. Must be a positive value. */ @@ -415,6 +325,12 @@ public final class KafkaIOST extends IOLoadTestBase { /** Runner specified to run the pipeline. */ @JsonProperty public String runner = "DirectRunner"; + /** + * Determines whether to use Dataflow runner v2. If set to true, it uses SDF mode for reading + * from Kafka. Otherwise, Unbounded mode will be used. + */ + @JsonProperty public boolean useDataflowRunnerV2 = true; + /** Number of workers for the pipeline. */ @JsonProperty public int numWorkers = 20; @@ -449,32 +365,4 @@ public final class KafkaIOST extends IOLoadTestBase { /** InfluxDB database to publish metrics. * */ @JsonProperty public String influxDatabase; } - - /** - * Represents a period of time with associated load increase properties for stress testing - * scenarios. - */ - private static class LoadPeriod implements Serializable { - private final int loadIncreaseMultiplier; - private final long periodStartMillis; - private final long periodEndMillis; - - public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) { - this.loadIncreaseMultiplier = loadIncreaseMultiplier; - this.periodStartMillis = periodStartMillis; - this.periodEndMillis = periodEndMin; - } - - public int getLoadIncreaseMultiplier() { - return loadIncreaseMultiplier; - } - - public long getPeriodStartMillis() { - return periodStartMillis; - } - - public long getPeriodEndMillis() { - return periodEndMillis; - } - } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index e0c53436d4e..824aa901078 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -86,6 +86,7 @@ <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*Base\.java" /> <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*Client\.java" /> <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*LT\.java" /> + <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ST\.java" /> <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ResourceManagerTest\.java" /> <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*testinfra.*mockapis.*" /> <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*requestresponse.*" />