This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b952b41788a Python TextIO Performance Test (#23951)
b952b41788a is described below
commit b952b41788acc20edbe5b75b2196f30dbf8fdeb0
Author: Yi Hu <[email protected]>
AuthorDate: Wed Nov 16 14:18:12 2022 -0500
Python TextIO Performance Test (#23951)
* Python TextIO Performance Test
* Add filebasedio_perf_test module for unified test framework for
Python file-based IOs
* Fix MetricsReader publishes metrics duplicately if more than one
load test declared. This is because MetricsReader.publishers was
static class variable
* Fix pylint
* Distribute Python performance tests random time at a day instead of all
at 3PM
* Add information about length conversion
---
.../job_PerformanceTests_BigQueryIO_Python.groovy | 4 +-
.../job_PerformanceTests_FileBasedIO_Python.groovy | 81 +++++++++
.../job_PerformanceTests_PubsubIO_Python.groovy | 2 +-
.../job_PerformanceTests_SpannerIO_Python.groovy | 4 +-
.../Python_IO_IT_Tests_Dataflow.json | 122 +++++++++++++
.../python/apache_beam/io/filebasedio_perf_test.py | 188 +++++++++++++++++++++
.../apache_beam/testing/load_tests/load_test.py | 4 +-
.../testing/load_tests/load_test_metrics_utils.py | 16 +-
.../apache_beam/testing/synthetic_pipeline.py | 2 +-
9 files changed, 410 insertions(+), 13 deletions(-)
diff --git a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
index 1ccb8238ba8..853347f9ebf 100644
--- a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
@@ -90,7 +90,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, bqio_read_test)
}
-CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H 15 *
* *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H H *
* *', this) {
executeJob(delegate, bqio_read_test)
}
@@ -103,6 +103,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, bqio_write_test)
}
-CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch',
'H 15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch',
'H H * * *', this) {
executeJob(delegate, bqio_write_test)
}
diff --git a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy
b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy
new file mode 100644
index 00000000000..e45beadf321
--- /dev/null
+++ b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import LoadTestsBuilder as loadTestsBuilder
+import InfluxDBCredentialsHelper
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def jobs = [
+ [
+ name : 'beam_PerformanceTests_TextIOIT_Python',
+ description : 'Runs performance tests for Python TextIOIT',
+ test : 'apache_beam.io.filebasedio_perf_test',
+ githubTitle : 'Python TextIO Performance Test',
+ githubTriggerPhrase: 'Run Python TextIO Performance Test',
+ pipelineOptions : [
+ publish_to_big_query : true,
+ metrics_dataset : 'beam_performance',
+ metrics_table : 'python_textio_1GB_results',
+ influx_measurement : 'python_textio_1GB_results',
+ test_class : 'TextIOPerfTest',
+ input_options : '\'{' +
+ '"num_records": 25000000,' +
+ '"key_size": 9,' +
+ '"value_size": 21}\'',
+ dataset_size : '1050000000',
+ num_workers : '5',
+ autoscaling_algorithm: 'NONE'
+ ]
+ ]
+]
+
+jobs.findAll {
+ it.name in [
+ 'beam_PerformanceTests_TextIOIT_Python',
+ ]
+}.forEach { testJob -> createGCSFileBasedIOITTestJob(testJob) }
+
+private void createGCSFileBasedIOITTestJob(testJob) {
+ job(testJob.name) {
+ description(testJob.description)
+ common.setTopLevelMainJobProperties(delegate)
+ common.enablePhraseTriggeringFromPullRequest(delegate,
testJob.githubTitle, testJob.githubTriggerPhrase)
+ common.setAutoJob(delegate, 'H H * * *')
+ InfluxDBCredentialsHelper.useCredentials(delegate)
+ additionalPipelineArgs = [
+ influxDatabase: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influxHost: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ ]
+ testJob.pipelineOptions.putAll(additionalPipelineArgs)
+
+ def dataflowSpecificOptions = [
+ runner : 'DataflowRunner',
+ project : 'apache-beam-testing',
+ region : 'us-central1',
+ temp_location : 'gs://temp-storage-for-perf-tests/',
+ filename_prefix :
"gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/",
+ ]
+
+ Map allPipelineOptions = dataflowSpecificOptions << testJob.pipelineOptions
+
+ loadTestsBuilder.loadTest(
+ delegate, testJob.name, CommonTestProperties.Runner.DATAFLOW,
CommonTestProperties.SDK.PYTHON, allPipelineOptions, testJob.test)
+ }
+}
diff --git a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
index 327e93f392f..262eda3fd90 100644
--- a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
@@ -70,6 +70,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, psio_test)
}
-CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H
15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H
H * * *', this) {
executeJob(delegate, psio_test)
}
diff --git a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
index 489c72ebaa2..41618656707 100644
--- a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
@@ -92,7 +92,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, spannerio_read_test_2gb)
}
-CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H
15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H H
* * *', this) {
executeJob(delegate, spannerio_read_test_2gb)
}
@@ -105,6 +105,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, spannerio_write_test_2gb)
}
-CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch',
'H 15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch',
'H H * * *', this) {
executeJob(delegate, spannerio_write_test_2gb)
}
diff --git
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
index 5b1ff2b8103..6db7a46edb5 100644
---
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
+++
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
@@ -482,6 +482,128 @@
"align": false,
"alignLevel": null
}
+ },
+ {
+ "aliasColors": {},
+ "bars": false,
+ "cacheTimeout": null,
+ "dashLength": 10,
+ "dashes": false,
+ "datasource": "BeamInfluxDB",
+ "fill": 1,
+ "fillGradient": 0,
+ "gridPos": {
+ "h": 9,
+ "w": 12,
+ "x": 12,
+ "y": 9
+ },
+ "hiddenSeries": false,
+ "id": 6,
+ "interval": "24h",
+ "legend": {
+ "avg": false,
+ "current": false,
+ "max": false,
+ "min": false,
+ "show": false,
+ "total": false,
+ "values": false
+ },
+ "lines": true,
+ "linewidth": 2,
+ "links": [],
+ "nullPointMode": "connected",
+ "options": {
+ "dataLinks": []
+ },
+ "percentage": false,
+ "pluginVersion": "6.7.2",
+ "pointradius": 2,
+ "points": true,
+ "renderer": "flot",
+ "seriesOverrides": [],
+ "spaceLength": 10,
+ "stack": false,
+ "steppedLine": false,
+ "targets": [
+ {
+ "alias": "$tag_metric",
+ "groupBy": [
+ {
+ "params": [
+ "$__interval"
+ ],
+ "type": "time"
+ }
+ ],
+ "measurement": "",
+ "orderByTime": "ASC",
+ "policy": "default",
+ "query": "SELECT mean(\"value\") FROM \"python_textio_1GB_results\"
WHERE \"metric\" = 'read_runtime' OR \"metric\" = 'write_runtime' AND
$timeFilter GROUP BY time($__interval), \"metric\"",
+ "rawQuery": true,
+ "refId": "A",
+ "resultFormat": "time_series",
+ "select": [
+ [
+ {
+ "params": [
+ "value"
+ ],
+ "type": "field"
+ },
+ {
+ "params": [],
+ "type": "mean"
+ }
+ ]
+ ],
+ "tags": []
+ }
+ ],
+ "thresholds": [],
+ "timeFrom": null,
+ "timeRegions": [],
+ "timeShift": null,
+ "title": "TextIO | GCS | 1 GB",
+ "tooltip": {
+ "shared": true,
+ "sort": 0,
+ "value_type": "individual"
+ },
+ "transparent": true,
+ "type": "graph",
+ "xaxis": {
+ "buckets": null,
+ "mode": "time",
+ "name": null,
+ "show": true,
+ "values": []
+ },
+ "yaxes": [
+ {
+ "$$hashKey": "object:403",
+ "format": "s",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ },
+ {
+ "$$hashKey": "object:404",
+ "format": "short",
+ "label": null,
+ "logBase": 1,
+ "max": null,
+ "min": null,
+ "show": true
+ }
+ ],
+ "yaxis": {
+ "align": false,
+ "alignLevel": null
+ }
}
],
"schemaVersion": 22,
diff --git a/sdks/python/apache_beam/io/filebasedio_perf_test.py
b/sdks/python/apache_beam/io/filebasedio_perf_test.py
new file mode 100644
index 00000000000..7d5b673098d
--- /dev/null
+++ b/sdks/python/apache_beam/io/filebasedio_perf_test.py
@@ -0,0 +1,188 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Performance tests for file based io connectors."""
+
+import logging
+import sys
+import uuid
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam import typehints
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.io.iobase import Read
+from apache_beam.io.textio import ReadFromText
+from apache_beam.io.textio import WriteToText
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test import LoadTestOptions
+from apache_beam.testing.load_tests.load_test_metrics_utils import
CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+WRITE_NAMESPACE = 'write'
+READ_NAMESPACE = 'read'
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class FileBasedIOTestOptions(LoadTestOptions):
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--test_class', required=True, help='Test class to run.')
+ parser.add_argument(
+ '--filename_prefix',
+ required=True,
+ help='Destination prefix for files generated by the test.')
+ parser.add_argument(
+ '--compression_type',
+ default='auto',
+ help='File compression type for writing and reading test files.')
+ parser.add_argument(
+ '--number_of_shards',
+ type=int,
+ default=0,
+ help='Number of files this test will create during the write phase.')
+ parser.add_argument(
+ '--dataset_size',
+ type=int,
+ help='Size of data saved on the target filesystem (bytes).')
+
+
[email protected]_output_types(bytes)
[email protected]_input_types(Tuple[bytes, bytes])
+class SyntheticRecordToStrFn(beam.DoFn):
+ """
+ A DoFn that convert key-value bytes from synthetic source to string record.
+
+ It uses base64 to convert random bytes emitted from the synthetic source.
+ Therefore, every 3 bytes give 4 bytes long ascii characters.
+
+ Output length = 4(ceil[len(key)/3] + ceil[len(value)/3]) + 1
+ """
+ def process(self, element):
+ import base64
+ yield base64.b64encode(element[0]) + b' ' + base64.b64encode(element[1])
+
+
+class CreateFolderFn(beam.DoFn):
+ """Create folder at pipeline runtime."""
+ def __init__(self, folder):
+ self.folder = folder
+
+ def process(self, element):
+ from apache_beam.io.filesystems import FileSystems # pylint:
disable=reimported
+ filesystem = FileSystems.get_filesystem(self.folder)
+ if filesystem.has_dirs() and not filesystem.exists(self.folder):
+ filesystem.mkdirs(self.folder)
+
+
+class TextIOPerfTest:
+ def run(self):
+ write_test = _TextIOWritePerfTest(need_cleanup=False)
+ read_test = _TextIOReadPerfTest(input_folder=write_test.output_folder)
+ write_test.run()
+ read_test.run()
+
+
+class _TextIOWritePerfTest(LoadTest):
+ def __init__(self, need_cleanup=True):
+ super().__init__(WRITE_NAMESPACE)
+ self.need_cleanup = need_cleanup
+ self.test_options = self.pipeline.get_pipeline_options().view_as(
+ FileBasedIOTestOptions)
+ self.output_folder = FileSystems.join(
+ self.test_options.filename_prefix, str(uuid.uuid4()))
+
+ def test(self):
+ # first makedir if needed
+ _ = (
+ self.pipeline
+ | beam.Impulse()
+ | beam.ParDo(CreateFolderFn(self.output_folder)))
+
+ # write to text
+ _ = (
+ self.pipeline
+ | 'Produce rows' >> Read(
+ SyntheticSource(self.parse_synthetic_source_options()))
+ | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace))
+ | 'Format' >> beam.ParDo(SyntheticRecordToStrFn())
+ | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ | 'Write Text' >> WriteToText(
+ file_path_prefix=FileSystems.join(self.output_folder, 'test'),
+ compression_type=self.test_options.compression_type,
+ num_shards=self.test_options.number_of_shards))
+
+ def cleanup(self):
+ if not self.need_cleanup:
+ return
+ try:
+ FileSystems.delete([self.output_folder])
+ except IOError:
+ # may not have delete permission, just raise a warning
+ _LOGGER.warning(
+ 'Unable to delete file %s during cleanup.', self.output_folder)
+
+
+class _TextIOReadPerfTest(LoadTest):
+ def __init__(self, input_folder):
+ super().__init__(READ_NAMESPACE)
+ self.test_options = self.pipeline.get_pipeline_options().view_as(
+ FileBasedIOTestOptions)
+ self.input_folder = input_folder
+
+ def test(self):
+ output = (
+ self.pipeline
+ | 'Read from text' >>
+ ReadFromText(file_pattern=FileSystems.join(self.input_folder, '*'))
+ | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace))
+ | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+ | 'Count' >> beam.combiners.Count.Globally())
+ assert_that(output, equal_to([self.input_options['num_records']]))
+
+ def cleanup(self):
+ try:
+ #FileSystems.delete([self.input_folder])
+ pass
+ except IOError:
+ # may not have delete permission, just raise a warning
+ _LOGGER.warning(
+ 'Unable to delete file %s during cleanup.', self.input_folder)
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+
+ test_options = TestPipeline().get_pipeline_options().view_as(
+ FileBasedIOTestOptions)
+ supported_test_classes = list(
+ filter(
+ lambda s: s.endswith('PerfTest') and not s.startswith('_'),
+ dir(sys.modules[__name__])))
+
+ if test_options.test_class not in supported_test_classes:
+ raise RuntimeError(
+ f'Test {test_options.test_class} not found. '
+ 'Supported tests are {supported_test_classes}')
+
+ getattr(sys.modules[__name__], test_options.test_class)().run()
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test.py
b/sdks/python/apache_beam/testing/load_tests/load_test.py
index f5917fbfba2..3112c12ab86 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test.py
@@ -25,6 +25,7 @@ import sys
from apache_beam.metrics import MetricsFilter
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
from apache_beam.testing.load_tests.load_test_metrics_utils import
InfluxDBMetricsPublisherOptions
from apache_beam.testing.load_tests.load_test_metrics_utils import
MetricsReader
from apache_beam.testing.test_pipeline import TestPipeline
@@ -148,7 +149,8 @@ class LoadTest(object):
if not hasattr(self, 'result'):
self.result = self.pipeline.run()
# Defaults to waiting forever, unless timeout_ms has been set
- self.result.wait_until_finish(duration=self.timeout_ms)
+ state = self.result.wait_until_finish(duration=self.timeout_ms)
+ assert state != PipelineState.FAILED
self._metrics_monitor.publish_metrics(self.result, self.extra_metrics)
finally:
self.cleanup()
diff --git
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index 9c6ef2a935e..fbca1cb96e9 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -33,7 +33,6 @@ import json
import logging
import time
import uuid
-from typing import Any
from typing import List
from typing import Mapping
from typing import Optional
@@ -185,8 +184,6 @@ class MetricsReader(object):
A :class:`MetricsReader` retrieves metrics from pipeline result,
prepares it for publishers and setup publishers.
"""
- publishers = [] # type: List[Any]
-
def __init__(
self,
project_name=None,
@@ -206,6 +203,7 @@ class MetricsReader(object):
filters: MetricFilter to query only filtered metrics
"""
self._namespace = namespace
+ self.publishers: List[MetricsPublisher] = []
self.publishers.append(ConsoleMetricsPublisher())
check = project_name and bq_table and bq_dataset and publish_to_bq
@@ -385,7 +383,13 @@ class RuntimeMetric(Metric):
return runtime_in_s
-class ConsoleMetricsPublisher(object):
+class MetricsPublisher:
+ """Base class for metrics publishers."""
+ def publish(self, results):
+ raise NotImplementedError
+
+
+class ConsoleMetricsPublisher(MetricsPublisher):
"""A :class:`ConsoleMetricsPublisher` publishes collected metrics
to console output."""
def publish(self, results):
@@ -401,7 +405,7 @@ class ConsoleMetricsPublisher(object):
_LOGGER.info("No test results were collected.")
-class BigQueryMetricsPublisher(object):
+class BigQueryMetricsPublisher(MetricsPublisher):
"""A :class:`BigQueryMetricsPublisher` publishes collected metrics
to BigQuery output."""
def __init__(self, project_name, table, dataset):
@@ -484,7 +488,7 @@ class InfluxDBMetricsPublisherOptions(object):
return self.user is not None and self.password is not None
-class InfluxDBMetricsPublisher(object):
+class InfluxDBMetricsPublisher(MetricsPublisher):
"""Publishes collected metrics to InfluxDB database."""
def __init__(
self,
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py
b/sdks/python/apache_beam/testing/synthetic_pipeline.py
index 305e4229486..a520b31cb9f 100644
--- a/sdks/python/apache_beam/testing/synthetic_pipeline.py
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -22,7 +22,7 @@ Exact shape of the pipeline and the behaviour of sources and
steps can be
controlled through arguments. Please see function 'parse_args()' for more
details about the arguments.
-Shape of the pipeline is primariy controlled through two arguments. Argument
+Shape of the pipeline is primarily controlled through two arguments. Argument
'steps' can be used to define a list of steps as a JSON string. Argument
'barrier' describes how these steps are separated from each other. Argument
'barrier' can be use to build a pipeline as a series of steps or a tree of