This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 02ac93ed697 Add DiskProvisionedIops/ThroughputMibps pipeline options
for the Python SDK (#38370)
02ac93ed697 is described below
commit 02ac93ed6977b52be21c9aef2924b6451a18c8c9
Author: bambadiouf1 <[email protected]>
AuthorDate: Thu May 7 14:13:48 2026 -0700
Add DiskProvisionedIops/ThroughputMibps pipeline options for the Python SDK
(#38370)
* Restore Java and Go changes for disk provisioned IOPS and throughput
* Add CHANGES.md entry for disk provisioned IOPS and throughput
* restore go changes
* initialize options map in dataflow job to prevent nil pointer exceptions
* go fmt
* add testDiskProvisionedOptionsConfig unit test
* Update pr id in changes.md
* add disk_provisioned_iops and disk_provisioned_throughput_mibps options
to pipeline configuration
* add Python support to disk IOPS and throughput pipeline options in
CHANGES.md
* revert manual changes
* ran gen_client
* gen client
* trigger postcommit_python
* Delete R74
* undo gen_client deletion
* delete change to trigger postcom
---
CHANGES.md | 3 +-
.../python/apache_beam/options/pipeline_options.py | 18 ++++
.../apache_beam/options/pipeline_options_test.py | 6 ++
.../runners/dataflow/internal/apiclient.py | 5 ++
.../runners/dataflow/internal/apiclient_test.py | 17 ++++
.../clients/dataflow/dataflow_v1b3_messages.py | 99 +++++++++++++---------
6 files changed, 104 insertions(+), 44 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 922c38ffef3..0db7fddba4f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -70,8 +70,7 @@
## New Features / Improvements
-* Added support for setting disk provisioned IOPS and throughput in Dataflow
runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps`
pipeline options (Java/Go)
([#38349](https://github.com/apache/beam/issues/38349)).
-* X feature added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* Added support for setting disk provisioned IOPS and throughput in Dataflow
runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps`
pipeline options (Java/Go/Python)
([#38349](https://github.com/apache/beam/issues/38349)).
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
compatible. Both coders can decode encoded bytes from the other coder
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index a79bddb21ab..265313cd013 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1402,6 +1402,24 @@ class WorkerOptions(PipelineOptions):
dest='disk_type',
default=None,
help=('Specifies what type of persistent disk should be used.'))
+ parser.add_argument(
+ '--disk_provisioned_iops',
+ type=int,
+ default=None,
+ dest='disk_provisioned_iops',
+ help=(
+ 'The provisioned IOPS of the disk. If not set, the Dataflow
service'
+ ' will choose a reasonable default.'),
+ )
+ parser.add_argument(
+ '--disk_provisioned_throughput_mibps',
+ type=int,
+ default=None,
+ dest='disk_provisioned_throughput_mibps',
+ help=(
+ 'The provisioned throughput of the disk in MiB/s. If not set, the'
+ ' Dataflow service will choose a reasonable default.'),
+ )
parser.add_argument(
'--worker_region',
default=None,
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 215c44156ea..901f56b99cb 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -444,12 +444,18 @@ class PipelineOptionsTest(unittest.TestCase):
'abc',
'--disk_type',
'def',
+ '--disk_provisioned_iops',
+ '4000',
+ '--disk_provisioned_throughput_mibps',
+ '200',
'--element_processing_timeout_minutes',
'10',
])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
+ self.assertEqual(worker_options.disk_provisioned_iops, 4000)
+ self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200)
self.assertEqual(worker_options.element_processing_timeout_minutes, 10)
options = PipelineOptions(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 097523a5131..4a7c61901de 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -208,6 +208,11 @@ class Environment(object):
pool.diskSizeGb = self.worker_options.disk_size_gb
if self.worker_options.disk_type:
pool.diskType = self.worker_options.disk_type
+ if self.worker_options.disk_provisioned_iops is not None:
+ pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops
+ if self.worker_options.disk_provisioned_throughput_mibps is not None:
+ pool.diskProvisionedThroughputMibps = (
+ self.worker_options.disk_provisioned_throughput_mibps)
if self.worker_options.zone:
pool.zone = self.worker_options.zone
if self.worker_options.network:
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 43f51d0b39f..909ce9fd117 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -644,6 +644,23 @@ class UtilTest(unittest.TestCase):
FAKE_PIPELINE_URL)
self.assertEqual(env.proto.workerPools[0].numThreadsPerWorker, 2)
+ def test_disk_provisioning_options(self):
+ pipeline_options = PipelineOptions([
+ '--temp_location',
+ 'gs://any-location/temp',
+ '--disk_provisioned_iops',
+ '4000',
+ '--disk_provisioned_throughput_mibps',
+ '200'
+ ])
+ env = apiclient.Environment([],
+ pipeline_options,
+ '2.0.0',
+ FAKE_PIPELINE_URL)
+ self.assertEqual(env.proto.workerPools[0].diskProvisionedIops, 4000)
+ self.assertEqual(
+ env.proto.workerPools[0].diskProvisionedThroughputMibps, 200)
+
@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.'
'beam_version.__version__',
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
index 0c096e73c1a..582fb30b57b 100644
---
a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
+++
b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_messages.py
@@ -2634,8 +2634,8 @@ class FlexTemplateRuntimeEnvironment(_messages.Message):
ipConfiguration: Configuration for VM IPs.
kmsKeyName: Name for the Cloud KMS key for the job. Key format is:
projects//locations//keyRings//cryptoKeys/
- launcherMachineType: The machine type to use for launching the job. The
- default is n1-standard-1.
+ launcherMachineType: The machine type to use for launching the job. If not
+ set, Dataflow will select a default machine type.
machineType: The machine type to use for the job. Defaults to the value
from the template if not specified.
maxWorkers: The maximum number of Google Compute Engine instances to be
@@ -3209,6 +3209,7 @@ class Job(_messages.Message):
attempts to create a job with the same name as an active job that
already exists, the attempt returns the existing job. The name must
match the regular expression `[a-z]([-a-z0-9]{0,1022}[a-z0-9])?`
+ pausable: Output only. Indicates whether the job can be paused.
pipelineDescription: Preliminary field: The format of this data may change
at any time. A description of the user pipeline and stages through which
it is executed. Created by Cloud Dataflow service. Only retrieved with
@@ -3498,22 +3499,23 @@ class Job(_messages.Message):
labels = _messages.MessageField('LabelsValue', 10)
location = _messages.StringField(11)
name = _messages.StringField(12)
- pipelineDescription = _messages.MessageField('PipelineDescription', 13)
- projectId = _messages.StringField(14)
- replaceJobId = _messages.StringField(15)
- replacedByJobId = _messages.StringField(16)
- requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 17)
- runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 18)
- satisfiesPzi = _messages.BooleanField(19)
- satisfiesPzs = _messages.BooleanField(20)
- serviceResources = _messages.MessageField('ServiceResources', 21)
- stageStates = _messages.MessageField('ExecutionStageState', 22,
repeated=True)
- startTime = _messages.StringField(23)
- steps = _messages.MessageField('Step', 24, repeated=True)
- stepsLocation = _messages.StringField(25)
- tempFiles = _messages.StringField(26, repeated=True)
- transformNameMapping = _messages.MessageField('TransformNameMappingValue',
27)
- type = _messages.EnumField('TypeValueValuesEnum', 28)
+ pausable = _messages.BooleanField(13)
+ pipelineDescription = _messages.MessageField('PipelineDescription', 14)
+ projectId = _messages.StringField(15)
+ replaceJobId = _messages.StringField(16)
+ replacedByJobId = _messages.StringField(17)
+ requestedState = _messages.EnumField('RequestedStateValueValuesEnum', 18)
+ runtimeUpdatableParams = _messages.MessageField('RuntimeUpdatableParams', 19)
+ satisfiesPzi = _messages.BooleanField(20)
+ satisfiesPzs = _messages.BooleanField(21)
+ serviceResources = _messages.MessageField('ServiceResources', 22)
+ stageStates = _messages.MessageField('ExecutionStageState', 23,
repeated=True)
+ startTime = _messages.StringField(24)
+ steps = _messages.MessageField('Step', 25, repeated=True)
+ stepsLocation = _messages.StringField(26)
+ tempFiles = _messages.StringField(27, repeated=True)
+ transformNameMapping = _messages.MessageField('TransformNameMappingValue',
28)
+ type = _messages.EnumField('TypeValueValuesEnum', 29)
class JobExecutionDetails(_messages.Message):
@@ -5342,8 +5344,14 @@ class RuntimeUpdatableParams(_messages.Message):
during job creation.
Fields:
- acceptableBacklogDuration: Optional. The backlog threshold duration in
- seconds for autoscaling. Value must be non-negative.
+ acceptableBacklogDuration: Optional. Deprecated: Use `latency_tier`
+ instead. The backlog threshold duration in seconds for autoscaling.
+ Value must be non-negative.
+ autoscalingTier: Optional. Deprecated: Use `latency_tier` instead. The
+ backlog threshold tier for autoscaling. Value must be one of "low-
+ latency", "medium-latency", or "high-latency".
+ latencyTier: Optional. The backlog threshold tier for autoscaling. Value
+ must be one of "low-latency", "medium-latency", or "high-latency".
maxNumWorkers: The maximum number of workers to cap autoscaling at. This
field is currently only supported for Streaming Engine jobs.
minNumWorkers: The minimum number of workers to scale down to. This field
@@ -5357,9 +5365,11 @@ class RuntimeUpdatableParams(_messages.Message):
"""
acceptableBacklogDuration = _messages.StringField(1)
- maxNumWorkers = _messages.IntegerField(2, variant=_messages.Variant.INT32)
- minNumWorkers = _messages.IntegerField(3, variant=_messages.Variant.INT32)
- workerUtilizationHint = _messages.FloatField(4)
+ autoscalingTier = _messages.StringField(2)
+ latencyTier = _messages.StringField(3)
+ maxNumWorkers = _messages.IntegerField(4, variant=_messages.Variant.INT32)
+ minNumWorkers = _messages.IntegerField(5, variant=_messages.Variant.INT32)
+ workerUtilizationHint = _messages.FloatField(6)
class SDKInfo(_messages.Message):
@@ -7775,6 +7785,9 @@ class WorkerPool(_messages.Message):
defaultPackageSet: The default package set to install. This allows the
service to select a default set of packages which are useful to worker
harnesses written in a particular language.
+ diskProvisionedIops: Optional. IOPS provisioned for the root disk for VMs.
+ diskProvisionedThroughputMibps: Optional. Throughput provisioned for the
+ root disk for VMs.
diskSizeGb: Size of root disk for VMs, in GB. If zero or unspecified, the
service will attempt to choose a reasonable default.
diskSourceImage: Fully qualified source image for disks.
@@ -7938,25 +7951,27 @@ class WorkerPool(_messages.Message):
autoscalingSettings = _messages.MessageField('AutoscalingSettings', 1)
dataDisks = _messages.MessageField('Disk', 2, repeated=True)
defaultPackageSet = _messages.EnumField('DefaultPackageSetValueValuesEnum',
3)
- diskSizeGb = _messages.IntegerField(4, variant=_messages.Variant.INT32)
- diskSourceImage = _messages.StringField(5)
- diskType = _messages.StringField(6)
- ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 7)
- kind = _messages.StringField(8)
- machineType = _messages.StringField(9)
- metadata = _messages.MessageField('MetadataValue', 10)
- network = _messages.StringField(11)
- numThreadsPerWorker = _messages.IntegerField(12,
variant=_messages.Variant.INT32)
- numWorkers = _messages.IntegerField(13, variant=_messages.Variant.INT32)
- onHostMaintenance = _messages.StringField(14)
- packages = _messages.MessageField('Package', 15, repeated=True)
- poolArgs = _messages.MessageField('PoolArgsValue', 16)
- sdkHarnessContainerImages =
_messages.MessageField('SdkHarnessContainerImage', 17, repeated=True)
- subnetwork = _messages.StringField(18)
- taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 19)
- teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20)
- workerHarnessContainerImage = _messages.StringField(21)
- zone = _messages.StringField(22)
+ diskProvisionedIops = _messages.IntegerField(4)
+ diskProvisionedThroughputMibps = _messages.IntegerField(5)
+ diskSizeGb = _messages.IntegerField(6, variant=_messages.Variant.INT32)
+ diskSourceImage = _messages.StringField(7)
+ diskType = _messages.StringField(8)
+ ipConfiguration = _messages.EnumField('IpConfigurationValueValuesEnum', 9)
+ kind = _messages.StringField(10)
+ machineType = _messages.StringField(11)
+ metadata = _messages.MessageField('MetadataValue', 12)
+ network = _messages.StringField(13)
+ numThreadsPerWorker = _messages.IntegerField(14,
variant=_messages.Variant.INT32)
+ numWorkers = _messages.IntegerField(15, variant=_messages.Variant.INT32)
+ onHostMaintenance = _messages.StringField(16)
+ packages = _messages.MessageField('Package', 17, repeated=True)
+ poolArgs = _messages.MessageField('PoolArgsValue', 18)
+ sdkHarnessContainerImages =
_messages.MessageField('SdkHarnessContainerImage', 19, repeated=True)
+ subnetwork = _messages.StringField(20)
+ taskrunnerSettings = _messages.MessageField('TaskRunnerSettings', 21)
+ teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 22)
+ workerHarnessContainerImage = _messages.StringField(23)
+ zone = _messages.StringField(24)
class WorkerSettings(_messages.Message):