This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/gbekServiceOption in repository https://gitbox.apache.org/repos/asf/beam.git
commit b84663ea9f9b31eb5a7479b4ac2c8f58948a1978 Author: Danny Mccormick <dannymccorm...@google.com> AuthorDate: Thu Oct 9 10:12:04 2025 -0400 Add use_gbek service option when gbek option used --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 16 ++++++++++++++++ .../apache_beam/runners/dataflow/dataflow_runner.py | 14 ++++++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index de6a039b707..be2aade96e4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1283,6 +1283,22 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true); } + // Add use_gbek to dataflow_service_options if gbek is set. + List<String> dataflowServiceOptions = options.getDataflowServiceOptions(); + if (dataflowServiceOptions == null) { + dataflowServiceOptions = new ArrayList<>(); + } + if (!Strings.isNullOrEmpty(options.as(DataflowPipelineDebugOptions.class).getGbek())) { + if (!dataflowServiceOptions.contains("use_gbek")) { + dataflowServiceOptions.add("use_gbek"); + } + } else if (dataflowServiceOptions.contains("use_gbek")) { + throw new IllegalArgumentException( + "Do not set use_gbek directly, pass in the --gbek pipeline option " + + "with a valid secret instead."); + } + options.setDataflowServiceOptions(dataflowServiceOptions); + logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); logWarningIfBigqueryDLQUnused(pipeline); if (shouldActAsStreaming(pipeline)) { diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 9e339e289ff..57aed7cf9be 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -602,8 +602,15 @@ def _check_and_add_missing_options(options): debug_options = options.view_as(DebugOptions) dataflow_service_options = options.view_as( GoogleCloudOptions).dataflow_service_options or [] - options.view_as( - GoogleCloudOptions).dataflow_service_options = dataflow_service_options + + # Add use_gbek to dataflow_service_options if gbek is set. + if options.view_as(SetupOptions).gbek: + if 'use_gbek' not in dataflow_service_options: + dataflow_service_options.append('use_gbek') + elif 'use_gbek' in dataflow_service_options: + raise ValueError( + 'Do not set use_gbek directly, pass in the --gbek pipeline option ' + 'with a valid secret instead.') _add_runner_v2_missing_options(options) @@ -614,6 +621,9 @@ def _check_and_add_missing_options(options): elif debug_options.lookup_experiment('enable_prime'): dataflow_service_options.append('enable_prime') + options.view_as( + GoogleCloudOptions).dataflow_service_options = dataflow_service_options + sdk_location = options.view_as(SetupOptions).sdk_location if 'dev' in beam.version.__version__ and sdk_location == 'default': raise ValueError(