This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5797f355a Add pipeline hash (#38357)
aa5797f355a is described below

commit aa5797f355a3c5f77879f1a189f53cf22c959a5d
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Tue May 5 07:47:01 2026 -0700

    Add pipeline hash (#38357)
    
    * Add Pipeline Options
    
    * Add Pipeline Hash along with URL
    
    * fix go lang staging
    
    * Fix formatting
    
    * backmerge master
    
    * remove cache
    
    * fix test
    
    * fix proto hash creation
    
    * fix lint
    
    * move pipeline hash to sdkharness
    
    * fix spotless
---
 .../beam/runners/dataflow/DataflowRunner.java      |  3 ++
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      |  8 +++-
 .../beam/runners/dataflow/dataflowlib/execute.go   |  9 +++-
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   | 12 ++---
 .../beam/runners/dataflow/dataflowlib/job_test.go  | 51 +++++++++++++++++++++-
 .../apache/beam/sdk/options/SdkHarnessOptions.java |  6 +++
 .../runners/dataflow/internal/apiclient.py         | 13 ++++--
 .../runners/dataflow/internal/apiclient_test.py    | 35 +++++++++++++++
 8 files changed, 125 insertions(+), 12 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index b375de66188..ecc231ab825 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1368,6 +1368,9 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         options.getStager().stageToFile(serializedProtoPipeline, 
PIPELINE_FILE_NAME);
     dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
 
+    String pipelineProtoHash = 
Hashing.sha256().hashBytes(serializedProtoPipeline).toString();
+    
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);
+
     if (useUnifiedWorker(options)) {
       LOG.info("Skipping v1 transform replacements since job will run on v2.");
     } else {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index ecbfe53939e..e968911fcca 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -24,6 +24,8 @@ package dataflow
 
 import (
        "context"
+       "crypto/sha256"
+       "encoding/hex"
        "encoding/json"
        "flag"
        "fmt"
@@ -40,6 +42,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
@@ -239,7 +242,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) 
(beam.PipelineResult, error)
                log.Info(ctx, "Dry-run: not submitting job!")
 
                log.Info(ctx, model.String())
-               job, err := dataflowlib.Translate(ctx, model, opts, workerURL, 
modelURL)
+               modelBytes := protox.MustEncode(model)
+               hash := sha256.Sum256(modelBytes)
+               pipelineProtoHash := hex.EncodeToString(hash[:])
+               job, err := dataflowlib.Translate(ctx, model, opts, workerURL, 
modelURL, pipelineProtoHash)
                if err != nil {
                        return nil, err
                }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index 806b8940ae9..396eefab731 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -19,6 +19,8 @@ package dataflowlib
 
 import (
        "context"
+       "crypto/sha256"
+       "encoding/hex"
        "encoding/json"
        "os"
        "strings"
@@ -83,14 +85,17 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, 
opts *JobOptions, worker
        // (2) Upload model to GCS
        log.Info(ctx, raw.String())
 
-       if err := StageModel(ctx, opts.Project, modelURL, 
protox.MustEncode(raw)); err != nil {
+       modelBytes := protox.MustEncode(raw)
+       modelHash := sha256.Sum256(modelBytes)
+       pipelineProtoHash := hex.EncodeToString(modelHash[:])
+       if err := StageModel(ctx, opts.Project, modelURL, modelBytes); err != 
nil {
                return presult, err
        }
        log.Infof(ctx, "Staged model pipeline: %v", modelURL)
 
        // (3) Translate to v1b3 and submit
 
-       job, err := Translate(ctx, raw, opts, workerURL, modelURL)
+       job, err := Translate(ctx, raw, opts, workerURL, modelURL, 
pipelineProtoHash)
        if err != nil {
                return presult, err
        }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 2f8057b6d50..f0adb21cf71 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -117,7 +117,7 @@ func containerImages(p *pipepb.Pipeline) 
([]*df.SdkHarnessContainerImage, []stri
 }
 
 // Translate translates a pipeline to a Dataflow job.
-func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, 
workerURL, modelURL string) (*df.Job, error) {
+func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, 
workerURL, modelURL string, pipelineProtoHash string) (*df.Job, error) {
        // (1) Translate pipeline to v1b3 speak.
 
        jobType := "JOB_TYPE_BATCH"
@@ -181,10 +181,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, 
opts *JobOptions, worker
                        SdkPipelineOptions: newMsg(pipelineOptions{
                                DisplayData: printOptions(opts, images),
                                Options: dataflowOptions{
-                                       PipelineURL:  modelURL,
-                                       Region:       opts.Region,
-                                       Experiments:  opts.Experiments,
-                                       TempLocation: opts.TempLocation,
+                                       PipelineURL:       modelURL,
+                                       PipelineProtoHash: pipelineProtoHash,
+                                       Region:            opts.Region,
+                                       Experiments:       opts.Experiments,
+                                       TempLocation:      opts.TempLocation,
                                },
                                GoOptions: opts.Options,
                        }),
@@ -359,6 +360,7 @@ func GetMetrics(ctx context.Context, client *df.Service, 
project, region, jobID
 type dataflowOptions struct {
        Experiments                    []string `json:"experiments,omitempty"`
        PipelineURL                    string   `json:"pipelineUrl"`
+       PipelineProtoHash              string   
`json:"pipelineProtoHash,omitempty"`
        Region                         string   `json:"region"`
        TempLocation                   string   `json:"tempLocation"`
        DiskProvisionedIops            int64    `json:"diskProvisionedIops"`
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
index 303fcb776bf..901adb6c7b7 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
@@ -21,6 +21,9 @@ import (
        "reflect"
        "testing"
 
+       "encoding/json"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
        pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -293,7 +296,7 @@ func TestTranslate(t *testing.T) {
        workerURL := "gs://any-location/temp"
        modelURL := "gs://any-location/temp"
 
-       job, err := Translate(ctx, p, opts, workerURL, modelURL)
+       job, err := Translate(ctx, p, opts, workerURL, modelURL, 
"dummy-hash-12345")
        if err != nil {
                t.Fatalf("Translate(...) error = %v, want nil", err)
        }
@@ -310,3 +313,49 @@ func TestTranslate(t *testing.T) {
                t.Errorf("DiskProvisionedThroughputMibps = %v, want 200", 
wp.DiskProvisionedThroughputMibps)
        }
 }
+
+func TestTranslateWithPipelineHash(t *testing.T) {
+       p := &pipepb.Pipeline{
+               Components: &pipepb.Components{
+                       Environments: map[string]*pipepb.Environment{
+                               "env1": {
+                                       Payload: 
protox.MustEncode(&pipepb.DockerPayload{
+                                               ContainerImage: "dummy_image",
+                                       }),
+                               },
+                       },
+               },
+       }
+       opts := &JobOptions{
+               Name:    "test-job",
+               Project: "test-project",
+               Region:  "test-region",
+               Options: runtime.RawOptions{
+                       Options: make(map[string]string),
+               },
+       }
+
+       expectedHashStr := "dummy-hash-12345"
+
+       job, err := Translate(context.Background(), p, opts, "worker-url", 
"model-url", expectedHashStr)
+       if err != nil {
+               t.Fatalf("Translate failed: %v", err)
+       }
+
+       // Verify PipelineProtoHash
+       var recoveredOptions struct {
+               Options struct {
+                       PipelineURL       string `json:"pipelineUrl"`
+                       PipelineProtoHash string `json:"pipelineProtoHash"`
+               } `json:"options"`
+       }
+
+       rawOpts := job.Environment.SdkPipelineOptions
+       if err := json.Unmarshal(rawOpts, &recoveredOptions); err != nil {
+               t.Fatalf("Failed to unmarshal SdkPipelineOptions: %v", err)
+       }
+
+       if recoveredOptions.Options.PipelineProtoHash != expectedHashStr {
+               t.Errorf("Expected PipelineProtoHash %v, got %v", 
expectedHashStr, recoveredOptions.Options.PipelineProtoHash)
+       }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
index 831dd69ec95..7267dda9ed0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java
@@ -481,4 +481,10 @@ public interface SdkHarnessOptions extends 
PipelineOptions, MemoryMonitorOptions
       return GlobalOpenTelemetry.get();
     }
   }
+
+  /** The hex-encoded SHA256 hash of the staged portable pipeline proto. */
+  @Description("The hex-encoded SHA256 hash of the staged portable pipeline 
proto")
+  String getPipelineProtoHash();
+
+  void setPipelineProtoHash(String hash);
 }
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 29cb3607148..f38a2ee34bb 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -97,7 +97,8 @@ class Environment(object):
       options,
       environment_version,
       proto_pipeline_staged_url,
-      proto_pipeline=None):
+      proto_pipeline=None,
+      pipeline_proto_hash=None):
     self.standard_options = options.view_as(StandardOptions)
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
     self.worker_options = options.view_as(WorkerOptions)
@@ -279,6 +280,8 @@ class Environment(object):
           for k, v in sdk_pipeline_options.items() if v is not None
       }
       options_dict["pipelineUrl"] = proto_pipeline_staged_url
+      if pipeline_proto_hash:
+        options_dict["pipelineProtoHash"] = pipeline_proto_hash
       # Don't pass impersonate_service_account through to the harness.
       # Though impersonation should start a job, the workers should
       # not try to modify their credentials.
@@ -831,10 +834,13 @@ class DataflowApplicationClient(object):
     resources = self._stage_resources(job.proto_pipeline, job.options)
 
     # Stage proto pipeline.
+    serialized_pipeline = job.proto_pipeline.SerializeToString()
+    pipeline_proto_hash = hashlib.sha256(serialized_pipeline).hexdigest()
+
     self.stage_file_with_retry(
         job.google_cloud_options.staging_location,
         shared_names.STAGED_PIPELINE_FILENAME,
-        io.BytesIO(job.proto_pipeline.SerializeToString()))
+        io.BytesIO(serialized_pipeline))
 
     job.proto.environment = Environment(
         proto_pipeline_staged_url=FileSystems.join(
@@ -843,7 +849,8 @@ class DataflowApplicationClient(object):
         packages=resources,
         options=job.options,
         environment_version=self.environment_version,
-        proto_pipeline=job.proto_pipeline).proto
+        proto_pipeline=job.proto_pipeline,
+        pipeline_proto_hash=pipeline_proto_hash).proto
     _LOGGER.debug('JOB: %s', job)
 
   @retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 66b1c8e1e5b..43f4c8a2151 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -19,6 +19,7 @@
 
 # pytype: skip-file
 
+import hashlib
 import io
 import itertools
 import json
@@ -97,6 +98,40 @@ class UtilTest(unittest.TestCase):
 
     self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL)
 
+  def test_pipeline_proto_hash(self):
+    pipeline_options = PipelineOptions(
+        ['--temp_location', 'gs://any-location/temp'])
+    proto_pipeline = beam_runner_api_pb2.Pipeline()
+    proto_pipeline.components.transforms['dummy'].unique_name = 'dummy'
+
+    expected_hash = hashlib.sha256(
+        proto_pipeline.SerializeToString()).hexdigest()
+
+    env = apiclient.Environment([],
+                                pipeline_options,
+                                '2.0.0',
+                                FAKE_PIPELINE_URL,
+                                proto_pipeline,
+                                pipeline_proto_hash=expected_hash)
+
+    recovered_options = None
+    for additionalProperty in 
env.proto.sdkPipelineOptions.additionalProperties:
+      if additionalProperty.key == 'options':
+        recovered_options = additionalProperty.value
+        break
+    else:
+      self.fail('No pipeline options found')
+
+    pipeline_proto_hash = None
+    for property in recovered_options.object_value.properties:
+      if property.key == 'pipelineProtoHash':
+        pipeline_proto_hash = property.value
+        break
+    else:
+      self.fail('No pipelineProtoHash found')
+
+    self.assertEqual(pipeline_proto_hash.string_value, expected_hash)
+
   def test_set_network(self):
     pipeline_options = PipelineOptions([
         '--network',

Reply via email to