This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b952b41788a Python TextIO Performance Test (#23951)
b952b41788a is described below

commit b952b41788acc20edbe5b75b2196f30dbf8fdeb0
Author: Yi Hu <[email protected]>
AuthorDate: Wed Nov 16 14:18:12 2022 -0500

    Python TextIO Performance Test (#23951)
    
    * Python TextIO Performance Test
    
    * Add filebasedio_perf_test module for unified test framework for
      Python file-based IOs
    
    * Fix MetricsReader publishes metrics duplicately if more than one
      load test declared. This is because MetricsReader.publishers was
      static class variable
    
    * Fix pylint
    
    * Distribute Python performance tests random time at a day instead of all 
at 3PM
    
    * Add information about length conversion
---
 .../job_PerformanceTests_BigQueryIO_Python.groovy  |   4 +-
 .../job_PerformanceTests_FileBasedIO_Python.groovy |  81 +++++++++
 .../job_PerformanceTests_PubsubIO_Python.groovy    |   2 +-
 .../job_PerformanceTests_SpannerIO_Python.groovy   |   4 +-
 .../Python_IO_IT_Tests_Dataflow.json               | 122 +++++++++++++
 .../python/apache_beam/io/filebasedio_perf_test.py | 188 +++++++++++++++++++++
 .../apache_beam/testing/load_tests/load_test.py    |   4 +-
 .../testing/load_tests/load_test_metrics_utils.py  |  16 +-
 .../apache_beam/testing/synthetic_pipeline.py      |   2 +-
 9 files changed, 410 insertions(+), 13 deletions(-)

diff --git a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy 
b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
index 1ccb8238ba8..853347f9ebf 100644
--- a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Python.groovy
@@ -90,7 +90,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
       executeJob(delegate, bqio_read_test)
     }
 
-CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H 15 * 
* *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H H * 
* *', this) {
   executeJob(delegate, bqio_read_test)
 }
 
@@ -103,6 +103,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
       executeJob(delegate, bqio_write_test)
     }
 
-CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch', 
'H 15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch', 
'H H * * *', this) {
   executeJob(delegate, bqio_write_test)
 }
diff --git a/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy 
b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy
new file mode 100644
index 00000000000..e45beadf321
--- /dev/null
+++ b/.test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import LoadTestsBuilder as loadTestsBuilder
+import InfluxDBCredentialsHelper
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def jobs = [
+  [
+    name               : 'beam_PerformanceTests_TextIOIT_Python',
+    description        : 'Runs performance tests for Python TextIOIT',
+    test               : 'apache_beam.io.filebasedio_perf_test',
+    githubTitle        : 'Python TextIO Performance Test',
+    githubTriggerPhrase: 'Run Python TextIO Performance Test',
+    pipelineOptions    : [
+      publish_to_big_query : true,
+      metrics_dataset      : 'beam_performance',
+      metrics_table        : 'python_textio_1GB_results',
+      influx_measurement   : 'python_textio_1GB_results',
+      test_class           : 'TextIOPerfTest',
+      input_options        : '\'{' +
+      '"num_records": 25000000,' +
+      '"key_size": 9,' +
+      '"value_size": 21}\'',
+      dataset_size         : '1050000000',
+      num_workers          : '5',
+      autoscaling_algorithm: 'NONE'
+    ]
+  ]
+]
+
+jobs.findAll {
+  it.name in [
+    'beam_PerformanceTests_TextIOIT_Python',
+  ]
+}.forEach { testJob -> createGCSFileBasedIOITTestJob(testJob) }
+
+private void createGCSFileBasedIOITTestJob(testJob) {
+  job(testJob.name) {
+    description(testJob.description)
+    common.setTopLevelMainJobProperties(delegate)
+    common.enablePhraseTriggeringFromPullRequest(delegate, 
testJob.githubTitle, testJob.githubTriggerPhrase)
+    common.setAutoJob(delegate, 'H H * * *')
+    InfluxDBCredentialsHelper.useCredentials(delegate)
+    additionalPipelineArgs = [
+      influxDatabase: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+      influxHost: InfluxDBCredentialsHelper.InfluxDBHostUrl,
+    ]
+    testJob.pipelineOptions.putAll(additionalPipelineArgs)
+
+    def dataflowSpecificOptions = [
+      runner          : 'DataflowRunner',
+      project         : 'apache-beam-testing',
+      region          : 'us-central1',
+      temp_location   : 'gs://temp-storage-for-perf-tests/',
+      filename_prefix : 
"gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/",
+    ]
+
+    Map allPipelineOptions = dataflowSpecificOptions << testJob.pipelineOptions
+
+    loadTestsBuilder.loadTest(
+        delegate, testJob.name, CommonTestProperties.Runner.DATAFLOW, 
CommonTestProperties.SDK.PYTHON, allPipelineOptions, testJob.test)
+  }
+}
diff --git a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy 
b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
index 327e93f392f..262eda3fd90 100644
--- a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy
@@ -70,6 +70,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
       executeJob(delegate, psio_test)
     }
 
-CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H 
15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H 
H * * *', this) {
   executeJob(delegate, psio_test)
 }
diff --git a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy 
b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
index 489c72ebaa2..41618656707 100644
--- a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
@@ -92,7 +92,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
       executeJob(delegate, spannerio_read_test_2gb)
     }
 
-CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H 
15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H H 
* * *', this) {
   executeJob(delegate, spannerio_read_test_2gb)
 }
 
@@ -105,6 +105,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
       executeJob(delegate, spannerio_write_test_2gb)
     }
 
-CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch',
 'H 15 * * *', this) {
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch',
 'H H * * *', this) {
   executeJob(delegate, spannerio_write_test_2gb)
 }
diff --git 
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
 
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
index 5b1ff2b8103..6db7a46edb5 100644
--- 
a/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
+++ 
b/.test-infra/metrics/grafana/dashboards/perftests_metrics/Python_IO_IT_Tests_Dataflow.json
@@ -482,6 +482,128 @@
         "align": false,
         "alignLevel": null
       }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "cacheTimeout": null,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "BeamInfluxDB",
+      "fill": 1,
+      "fillGradient": 0,
+      "gridPos": {
+        "h": 9,
+        "w": 12,
+        "x": 12,
+        "y": 9
+      },
+      "hiddenSeries": false,
+      "id": 6,
+      "interval": "24h",
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 2,
+      "links": [],
+      "nullPointMode": "connected",
+      "options": {
+        "dataLinks": []
+      },
+      "percentage": false,
+      "pluginVersion": "6.7.2",
+      "pointradius": 2,
+      "points": true,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "alias": "$tag_metric",
+          "groupBy": [
+            {
+              "params": [
+                "$__interval"
+              ],
+              "type": "time"
+            }
+          ],
+          "measurement": "",
+          "orderByTime": "ASC",
+          "policy": "default",
+          "query": "SELECT mean(\"value\") FROM \"python_textio_1GB_results\" 
WHERE \"metric\" = 'read_runtime' OR \"metric\" = 'write_runtime' AND 
$timeFilter GROUP BY time($__interval), \"metric\"",
+          "rawQuery": true,
+          "refId": "A",
+          "resultFormat": "time_series",
+          "select": [
+            [
+              {
+                "params": [
+                  "value"
+                ],
+                "type": "field"
+              },
+              {
+                "params": [],
+                "type": "mean"
+              }
+            ]
+          ],
+          "tags": []
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeRegions": [],
+      "timeShift": null,
+      "title": "TextIO | GCS | 1 GB",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "transparent": true,
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "$$hashKey": "object:403",
+          "format": "s",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "$$hashKey": "object:404",
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
     }
   ],
   "schemaVersion": 22,
diff --git a/sdks/python/apache_beam/io/filebasedio_perf_test.py 
b/sdks/python/apache_beam/io/filebasedio_perf_test.py
new file mode 100644
index 00000000000..7d5b673098d
--- /dev/null
+++ b/sdks/python/apache_beam/io/filebasedio_perf_test.py
@@ -0,0 +1,188 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Performance tests for file based io connectors."""
+
+import logging
+import sys
+import uuid
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam import typehints
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.io.iobase import Read
+from apache_beam.io.textio import ReadFromText
+from apache_beam.io.textio import WriteToText
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test import LoadTestOptions
+from apache_beam.testing.load_tests.load_test_metrics_utils import 
CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+WRITE_NAMESPACE = 'write'
+READ_NAMESPACE = 'read'
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class FileBasedIOTestOptions(LoadTestOptions):
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--test_class', required=True, help='Test class to run.')
+    parser.add_argument(
+        '--filename_prefix',
+        required=True,
+        help='Destination prefix for files generated by the test.')
+    parser.add_argument(
+        '--compression_type',
+        default='auto',
+        help='File compression type for writing and reading test files.')
+    parser.add_argument(
+        '--number_of_shards',
+        type=int,
+        default=0,
+        help='Number of files this test will create during the write phase.')
+    parser.add_argument(
+        '--dataset_size',
+        type=int,
+        help='Size of data saved on the target filesystem (bytes).')
+
+
[email protected]_output_types(bytes)
[email protected]_input_types(Tuple[bytes, bytes])
+class SyntheticRecordToStrFn(beam.DoFn):
+  """
+  A DoFn that convert key-value bytes from synthetic source to string record.
+
+  It uses base64 to convert random bytes emitted from the synthetic source.
+  Therefore, every 3 bytes give 4 bytes long ascii characters.
+
+  Output length = 4(ceil[len(key)/3] + ceil[len(value)/3]) + 1
+  """
+  def process(self, element):
+    import base64
+    yield base64.b64encode(element[0]) + b' ' + base64.b64encode(element[1])
+
+
+class CreateFolderFn(beam.DoFn):
+  """Create folder at pipeline runtime."""
+  def __init__(self, folder):
+    self.folder = folder
+
+  def process(self, element):
+    from apache_beam.io.filesystems import FileSystems  # pylint: 
disable=reimported
+    filesystem = FileSystems.get_filesystem(self.folder)
+    if filesystem.has_dirs() and not filesystem.exists(self.folder):
+      filesystem.mkdirs(self.folder)
+
+
+class TextIOPerfTest:
+  def run(self):
+    write_test = _TextIOWritePerfTest(need_cleanup=False)
+    read_test = _TextIOReadPerfTest(input_folder=write_test.output_folder)
+    write_test.run()
+    read_test.run()
+
+
+class _TextIOWritePerfTest(LoadTest):
+  def __init__(self, need_cleanup=True):
+    super().__init__(WRITE_NAMESPACE)
+    self.need_cleanup = need_cleanup
+    self.test_options = self.pipeline.get_pipeline_options().view_as(
+        FileBasedIOTestOptions)
+    self.output_folder = FileSystems.join(
+        self.test_options.filename_prefix, str(uuid.uuid4()))
+
+  def test(self):
+    # first makedir if needed
+    _ = (
+        self.pipeline
+        | beam.Impulse()
+        | beam.ParDo(CreateFolderFn(self.output_folder)))
+
+    # write to text
+    _ = (
+        self.pipeline
+        | 'Produce rows' >> Read(
+            SyntheticSource(self.parse_synthetic_source_options()))
+        | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace))
+        | 'Format' >> beam.ParDo(SyntheticRecordToStrFn())
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+        | 'Write Text' >> WriteToText(
+            file_path_prefix=FileSystems.join(self.output_folder, 'test'),
+            compression_type=self.test_options.compression_type,
+            num_shards=self.test_options.number_of_shards))
+
+  def cleanup(self):
+    if not self.need_cleanup:
+      return
+    try:
+      FileSystems.delete([self.output_folder])
+    except IOError:
+      # may not have delete permission, just raise a warning
+      _LOGGER.warning(
+          'Unable to delete file %s during cleanup.', self.output_folder)
+
+
+class _TextIOReadPerfTest(LoadTest):
+  def __init__(self, input_folder):
+    super().__init__(READ_NAMESPACE)
+    self.test_options = self.pipeline.get_pipeline_options().view_as(
+        FileBasedIOTestOptions)
+    self.input_folder = input_folder
+
+  def test(self):
+    output = (
+        self.pipeline
+        | 'Read from text' >>
+        ReadFromText(file_pattern=FileSystems.join(self.input_folder, '*'))
+        | 'Count records' >> beam.ParDo(CountMessages(self.metrics_namespace))
+        | 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
+        | 'Count' >> beam.combiners.Count.Globally())
+    assert_that(output, equal_to([self.input_options['num_records']]))
+
+  def cleanup(self):
+    try:
+      #FileSystems.delete([self.input_folder])
+      pass
+    except IOError:
+      # may not have delete permission, just raise a warning
+      _LOGGER.warning(
+          'Unable to delete file %s during cleanup.', self.input_folder)
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+
+  test_options = TestPipeline().get_pipeline_options().view_as(
+      FileBasedIOTestOptions)
+  supported_test_classes = list(
+      filter(
+          lambda s: s.endswith('PerfTest') and not s.startswith('_'),
+          dir(sys.modules[__name__])))
+
+  if test_options.test_class not in supported_test_classes:
+    raise RuntimeError(
+        f'Test {test_options.test_class} not found. '
+        'Supported tests are {supported_test_classes}')
+
+  getattr(sys.modules[__name__], test_options.test_class)().run()
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test.py 
b/sdks/python/apache_beam/testing/load_tests/load_test.py
index f5917fbfba2..3112c12ab86 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test.py
@@ -25,6 +25,7 @@ import sys
 from apache_beam.metrics import MetricsFilter
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
 from apache_beam.testing.load_tests.load_test_metrics_utils import 
InfluxDBMetricsPublisherOptions
 from apache_beam.testing.load_tests.load_test_metrics_utils import 
MetricsReader
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -148,7 +149,8 @@ class LoadTest(object):
       if not hasattr(self, 'result'):
         self.result = self.pipeline.run()
         # Defaults to waiting forever, unless timeout_ms has been set
-        self.result.wait_until_finish(duration=self.timeout_ms)
+        state = self.result.wait_until_finish(duration=self.timeout_ms)
+        assert state != PipelineState.FAILED
       self._metrics_monitor.publish_metrics(self.result, self.extra_metrics)
     finally:
       self.cleanup()
diff --git 
a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py 
b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index 9c6ef2a935e..fbca1cb96e9 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -33,7 +33,6 @@ import json
 import logging
 import time
 import uuid
-from typing import Any
 from typing import List
 from typing import Mapping
 from typing import Optional
@@ -185,8 +184,6 @@ class MetricsReader(object):
   A :class:`MetricsReader` retrieves metrics from pipeline result,
   prepares it for publishers and setup publishers.
   """
-  publishers = []  # type: List[Any]
-
   def __init__(
       self,
       project_name=None,
@@ -206,6 +203,7 @@ class MetricsReader(object):
       filters: MetricFilter to query only filtered metrics
     """
     self._namespace = namespace
+    self.publishers: List[MetricsPublisher] = []
     self.publishers.append(ConsoleMetricsPublisher())
 
     check = project_name and bq_table and bq_dataset and publish_to_bq
@@ -385,7 +383,13 @@ class RuntimeMetric(Metric):
     return runtime_in_s
 
 
-class ConsoleMetricsPublisher(object):
+class MetricsPublisher:
+  """Base class for metrics publishers."""
+  def publish(self, results):
+    raise NotImplementedError
+
+
+class ConsoleMetricsPublisher(MetricsPublisher):
   """A :class:`ConsoleMetricsPublisher` publishes collected metrics
   to console output."""
   def publish(self, results):
@@ -401,7 +405,7 @@ class ConsoleMetricsPublisher(object):
       _LOGGER.info("No test results were collected.")
 
 
-class BigQueryMetricsPublisher(object):
+class BigQueryMetricsPublisher(MetricsPublisher):
   """A :class:`BigQueryMetricsPublisher` publishes collected metrics
   to BigQuery output."""
   def __init__(self, project_name, table, dataset):
@@ -484,7 +488,7 @@ class InfluxDBMetricsPublisherOptions(object):
     return self.user is not None and self.password is not None
 
 
-class InfluxDBMetricsPublisher(object):
+class InfluxDBMetricsPublisher(MetricsPublisher):
   """Publishes collected metrics to InfluxDB database."""
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/testing/synthetic_pipeline.py 
b/sdks/python/apache_beam/testing/synthetic_pipeline.py
index 305e4229486..a520b31cb9f 100644
--- a/sdks/python/apache_beam/testing/synthetic_pipeline.py
+++ b/sdks/python/apache_beam/testing/synthetic_pipeline.py
@@ -22,7 +22,7 @@ Exact shape of the pipeline and the behaviour of sources and 
steps can be
 controlled through arguments. Please see function 'parse_args()' for more
 details about the arguments.
 
-Shape of the pipeline is primariy controlled through two arguments. Argument
+Shape of the pipeline is primarily controlled through two arguments. Argument
 'steps' can be used to define a list of steps as a JSON string. Argument
 'barrier' describes how these steps are separated from each other. Argument
 'barrier' can be use to build a pipeline as a series of steps or a tree of

Reply via email to