This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/gbekServiceOption
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b84663ea9f9b31eb5a7479b4ac2c8f58948a1978
Author: Danny Mccormick <dannymccorm...@google.com>
AuthorDate: Thu Oct 9 10:12:04 2025 -0400

    Add use_gbek service option when gbek option used
---
 .../org/apache/beam/runners/dataflow/DataflowRunner.java | 16 ++++++++++++++++
 .../apache_beam/runners/dataflow/dataflow_runner.py      | 14 ++++++++++++--
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index de6a039b707..be2aade96e4 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1283,6 +1283,22 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true);
     }
 
+    // Add use_gbek to dataflow_service_options if gbek is set.
+    List<String> dataflowServiceOptions = options.getDataflowServiceOptions();
+    if (dataflowServiceOptions == null) {
+      dataflowServiceOptions = new ArrayList<>();
+    }
+    if 
(!Strings.isNullOrEmpty(options.as(DataflowPipelineDebugOptions.class).getGbek()))
 {
+      if (!dataflowServiceOptions.contains("use_gbek")) {
+        dataflowServiceOptions.add("use_gbek");
+      }
+    } else if (dataflowServiceOptions.contains("use_gbek")) {
+      throw new IllegalArgumentException(
+          "Do not set use_gbek directly, pass in the --gbek pipeline option "
+              + "with a valid secret instead.");
+    }
+    options.setDataflowServiceOptions(dataflowServiceOptions);
+
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
     logWarningIfBigqueryDLQUnused(pipeline);
     if (shouldActAsStreaming(pipeline)) {
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 9e339e289ff..57aed7cf9be 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -602,8 +602,15 @@ def _check_and_add_missing_options(options):
   debug_options = options.view_as(DebugOptions)
   dataflow_service_options = options.view_as(
       GoogleCloudOptions).dataflow_service_options or []
-  options.view_as(
-      GoogleCloudOptions).dataflow_service_options = dataflow_service_options
+
+  # Add use_gbek to dataflow_service_options if gbek is set.
+  if options.view_as(SetupOptions).gbek:
+    if 'use_gbek' not in dataflow_service_options:
+      dataflow_service_options.append('use_gbek')
+  elif 'use_gbek' in dataflow_service_options:
+    raise ValueError(
+        'Do not set use_gbek directly, pass in the --gbek pipeline option '
+        'with a valid secret instead.')
 
   _add_runner_v2_missing_options(options)
 
@@ -614,6 +621,9 @@ def _check_and_add_missing_options(options):
   elif debug_options.lookup_experiment('enable_prime'):
     dataflow_service_options.append('enable_prime')
 
+  options.view_as(
+      GoogleCloudOptions).dataflow_service_options = dataflow_service_options
+
   sdk_location = options.view_as(SetupOptions).sdk_location
   if 'dev' in beam.version.__version__ and sdk_location == 'default':
     raise ValueError(

Reply via email to