This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 400b114a0a2 Add Staged Artifact validations for RunnerV2 (#37974)
400b114a0a2 is described below
commit 400b114a0a2e33b7d19e2ab640a799d045e92342
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Wed May 6 19:24:29 2026 -0700
Add Staged Artifact validations for RunnerV2 (#37974)
---
sdks/go/container/boot.go | 4 +
sdks/go/pkg/beam/artifact/materialize.go | 45 ++++++++-
sdks/go/pkg/beam/artifact/materialize_test.go | 110 +++++++++++++++++++++
sdks/go/pkg/beam/artifact/options.go | 48 +++++++++
sdks/go/pkg/beam/artifact/options_test.go | 78 +++++++++++++++
sdks/java/container/boot.go | 3 +
.../runners/dataflow/internal/apiclient.py | 5 +-
.../runners/dataflow/internal/apiclient_test.py | 42 ++++----
sdks/python/container/boot.go | 3 +
sdks/typescript/container/boot.go | 3 +
10 files changed, 316 insertions(+), 25 deletions(-)
diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index ab2da316931..469285821f7 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -149,6 +149,7 @@ func main() {
log.Fatalf("Endpoint not set: %v", err)
}
logger := &tools.Logger{Endpoint: *loggingEndpoint}
+ log.SetOutput(tools.NewBufferedLoggerWithFlushInterval(ctx, logger, 0))
logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args,
" "))
// (1) Obtain the pipeline options
@@ -158,6 +159,9 @@ func main() {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v",
err)
}
+ // Inject artifact validation enabled state into context
+ ctx = artifact.WithArtifactValidation(ctx,
!artifact.HasExperiment(info.GetPipelineOptions(),
"disable_staged_file_integrity_checks"))
+
// (2) Retrieve the staged files.
//
// The Go SDK harness downloads the worker binary and invokes
diff --git a/sdks/go/pkg/beam/artifact/materialize.go
b/sdks/go/pkg/beam/artifact/materialize.go
index 624e30efcd2..db624f3776a 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -51,6 +51,23 @@ const (
NoArtifactsStaged = "__no_artifacts_staged__"
)
+type validationKey string
+
+const artifactValidationKey validationKey = "artifact_validation_enabled"
+
+// WithArtifactValidation returns a new context carrying the artifact
validation enabled state.
+func WithArtifactValidation(ctx context.Context, enabled bool) context.Context
{
+ return context.WithValue(ctx, artifactValidationKey, enabled)
+}
+
+// isArtifactValidationEnabled parses pipeline options to check if
"disable_integrity_checks" is enabled.
+func isArtifactValidationEnabled(ctx context.Context) bool {
+ if val, ok := ctx.Value(artifactValidationKey).(bool); ok {
+ return val
+ }
+ return true
+}
+
// Materialize is a convenience helper for ensuring that all artifacts are
// present and uncorrupted. It interprets each artifact name as a relative
// path under the dest directory. It does not retrieve valid artifacts already
@@ -131,6 +148,7 @@ func newMaterializeWithClient(ctx context.Context, client
jobpb.ArtifactRetrieva
RoleUrn: URNStagingTo,
RolePayload: rolePayload,
},
+ expectedSha256: filePayload.Sha256,
})
}
@@ -183,8 +201,9 @@ func MustExtractFilePayload(artifact
*pipepb.ArtifactInformation) (string, strin
}
type artifact struct {
- client jobpb.ArtifactRetrievalServiceClient
- dep *pipepb.ArtifactInformation
+ client jobpb.ArtifactRetrievalServiceClient
+ dep *pipepb.ArtifactInformation
+ expectedSha256 string
}
func (a artifact) retrieve(ctx context.Context, dest string) error {
@@ -231,7 +250,19 @@ func (a artifact) retrieve(ctx context.Context, dest
string) error {
stat, _ := fd.Stat()
log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename,
sha256Hash, stat.Size())
- return fd.Close()
+ if err := fd.Close(); err != nil {
+ return err
+ }
+
+ if isArtifactValidationEnabled(ctx) {
+ if a.expectedSha256 == "" {
+ log.Printf("WARN: Artifact validation skipped for file:
%v", filename)
+ } else if sha256Hash != a.expectedSha256 {
+ return errors.Errorf("bad SHA256 for %v: %v, want %v",
filename, sha256Hash, a.expectedSha256)
+ }
+ }
+
+ return nil
}
func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w
io.Writer) (string, error) {
@@ -442,8 +473,12 @@ func retrieve(ctx context.Context, client
jobpb.LegacyArtifactRetrievalServiceCl
}
// Artifact Sha256 hash is an optional field in metadata so we should
only validate when its present.
- if a.Sha256 != "" && sha256Hash != a.Sha256 {
- return errors.Errorf("bad SHA256 for %v: %v, want %v",
filename, sha256Hash, a.Sha256)
+ if isArtifactValidationEnabled(ctx) {
+ if a.Sha256 == "" {
+ log.Printf("WARN: Artifact validation skipped for file:
%v", filename)
+ } else if sha256Hash != a.Sha256 {
+ return errors.Errorf("bad SHA256 for %v: %v, want %v",
filename, sha256Hash, a.Sha256)
+ }
}
return nil
}
diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go
b/sdks/go/pkg/beam/artifact/materialize_test.go
index 31890ed045c..bf27e13e8a8 100644
--- a/sdks/go/pkg/beam/artifact/materialize_test.go
+++ b/sdks/go/pkg/beam/artifact/materialize_test.go
@@ -82,6 +82,52 @@ func TestMultiRetrieve(t *testing.T) {
}
}
+func TestRetrieveWithBadShaFails(t *testing.T) {
+ cc := startServer(t)
+ defer cc.Close()
+
+ ctx := grpcx.WriteWorkerID(context.Background(), "idA")
+ keys := []string{"foo"}
+ st := "whatever"
+ rt, artifacts := populate(ctx, cc, t, keys, 300, st)
+
+ dst := makeTempDir(t)
+ defer os.RemoveAll(dst)
+
+ client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc)
+ for _, a := range artifacts {
+ a.Sha256 = "badhash" // mutate hash
+ if err := Retrieve(ctx, client, a, rt, dst); err == nil {
+ t.Errorf("expected materialization to fail due to bad
sha256 mismatch")
+ }
+ }
+}
+
+func TestRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) {
+ cc := startServer(t)
+ defer cc.Close()
+
+ ctx := WithArtifactValidation(grpcx.WriteWorkerID(context.Background(),
"idA"), false)
+ keys := []string{"foo"}
+ st := "whatever"
+ rt, artifacts := populate(ctx, cc, t, keys, 300, st)
+
+ dst := makeTempDir(t)
+ defer os.RemoveAll(dst)
+
+ client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc)
+ for _, a := range artifacts {
+ originalHash := a.Sha256
+ a.Sha256 = "badhash" // mutate hash
+ filename := makeFilename(dst, a.Name)
+ if err := Retrieve(ctx, client, a, rt, dst); err != nil {
+ t.Errorf("materialize failed but should have succeeded
because validation was disabled via experiment: %v", err)
+ continue
+ }
+ verifySHA256(t, filename, originalHash)
+ }
+}
+
// populate stages a set of artifacts with the given keys, each with
// slightly different sizes and chucksizes.
func populate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys
[]string, size int, st string) (string, []*jobpb.ArtifactMetadata) {
@@ -266,6 +312,55 @@ func TestNewRetrieveWithResolution(t *testing.T) {
checkStagedFiles(mds, dest, expected, t)
}
+func TestIsArtifactValidationEnabled(t *testing.T) {
+ ctx := context.Background()
+ if !isArtifactValidationEnabled(ctx) {
+ t.Errorf("empty context should have validation enabled")
+ }
+
+ ctx2 := WithArtifactValidation(ctx, false)
+ if isArtifactValidationEnabled(ctx2) {
+ t.Errorf("context with validation disabled should have
validation disabled")
+ }
+}
+
+func TestNewRetrieveWithBadShaFails(t *testing.T) {
+ expected := map[string]string{"a.txt": "a"}
+ client := &fakeRetrievalService{artifacts: expected}
+ dest := makeTempDir(t)
+ defer os.RemoveAll(dest)
+ ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+ _, err := newMaterializeWithClient(ctx, client,
client.fileArtifactsWithBadSha(), dest)
+ if err == nil {
+ t.Fatalf("expected materialization to fail due to bad sha256
mismatch")
+ }
+}
+
+func TestNewRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) {
+ expected := map[string]string{"a.txt": "a"}
+ client := &fakeRetrievalService{artifacts: expected}
+ dest := makeTempDir(t)
+ defer os.RemoveAll(dest)
+
+ ctx := WithArtifactValidation(grpcx.WriteWorkerID(context.Background(),
"worker"), false)
+
+ mds, err := newMaterializeWithClient(ctx, client,
client.fileArtifactsWithBadSha(), dest)
+ if err != nil {
+ t.Fatalf("materialize failed but should have succeeded because
validation was disabled via experiment: %v", err)
+ }
+
+ generated := make(map[string]string)
+ for _, md := range mds {
+ name, _ := MustExtractFilePayload(md)
+ payload, _ :=
proto.Marshal(&pipepb.ArtifactStagingToRolePayload{
+ StagedName: name})
+ generated[name] = string(payload)
+ }
+
+ checkStagedFiles(mds, dest, generated, t)
+}
+
func checkStagedFiles(mds []*pipepb.ArtifactInformation, dest string, expected
map[string]string, t *testing.T) {
if len(mds) != len(expected) {
t.Errorf("wrong number of artifacts staged %v vs %v", len(mds),
len(expected))
@@ -323,6 +418,21 @@ func (fake *fakeRetrievalService)
fileArtifactsWithoutStagingTo() []*pipepb.Arti
return artifacts
}
+func (fake *fakeRetrievalService) fileArtifactsWithBadSha()
[]*pipepb.ArtifactInformation {
+ var artifacts []*pipepb.ArtifactInformation
+ for name := range fake.artifacts {
+ payload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{
+ Path: filepath.Join("/tmp", name),
+ Sha256: "badhash",
+ })
+ artifacts = append(artifacts, &pipepb.ArtifactInformation{
+ TypeUrn: URNFileArtifact,
+ TypePayload: payload,
+ })
+ }
+ return artifacts
+}
+
func (fake *fakeRetrievalService) urlArtifactsWithoutStagingTo()
[]*pipepb.ArtifactInformation {
var artifacts []*pipepb.ArtifactInformation
for name := range fake.artifacts {
diff --git a/sdks/go/pkg/beam/artifact/options.go
b/sdks/go/pkg/beam/artifact/options.go
new file mode 100644
index 00000000000..47356433161
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/options.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+ structpb "google.golang.org/protobuf/types/known/structpb"
+)
+
+// GetExperiments extracts a list of experiments from the pipeline options.
+func GetExperiments(options *structpb.Struct) []string {
+ if options == nil {
+ return nil
+ }
+
+ var exps []string
+ // Try legacy style
+ for _, v := range
options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues()
{
+ exps = append(exps, v.GetStringValue())
+ }
+ // Try URN style
+ for _, v := range
options.GetFields()["beam:option:experiments:v1"].GetListValue().GetValues() {
+ exps = append(exps, v.GetStringValue())
+ }
+ return exps
+}
+
+// HasExperiment checks if a specific experiment is enabled in the pipeline
options.
+func HasExperiment(options *structpb.Struct, experiment string) bool {
+ for _, exp := range GetExperiments(options) {
+ if exp == experiment {
+ return true
+ }
+ }
+ return false
+}
diff --git a/sdks/go/pkg/beam/artifact/options_test.go
b/sdks/go/pkg/beam/artifact/options_test.go
new file mode 100644
index 00000000000..a9f0e4bb7e3
--- /dev/null
+++ b/sdks/go/pkg/beam/artifact/options_test.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package artifact
+
+import (
+ "testing"
+
+ structpb "google.golang.org/protobuf/types/known/structpb"
+)
+
+func TestGetExperiments_Nil(t *testing.T) {
+ if got := GetExperiments(nil); got != nil {
+ t.Errorf("GetExperiments(nil) = %v, want nil", got)
+ }
+}
+
+func TestGetExperiments_Legacy(t *testing.T) {
+ options, _ := structpb.NewStruct(map[string]interface{}{
+ "options": map[string]interface{}{
+ "experiments": []interface{}{"exp1", "exp2"},
+ },
+ })
+ exps := GetExperiments(options)
+ if len(exps) != 2 || exps[0] != "exp1" || exps[1] != "exp2" {
+ t.Errorf("GetExperiments() = %v, want [exp1 exp2]", exps)
+ }
+}
+
+func TestGetExperiments_URN(t *testing.T) {
+ urnOptions, _ := structpb.NewStruct(map[string]interface{}{
+ "beam:option:experiments:v1": []interface{}{"expA", "expB"},
+ })
+ expsURN := GetExperiments(urnOptions)
+ if len(expsURN) != 2 || expsURN[0] != "expA" || expsURN[1] != "expB" {
+ t.Errorf("GetExperiments() = %v, want [expA expB]", expsURN)
+ }
+}
+
+func TestHasExperiment(t *testing.T) {
+ options, _ := structpb.NewStruct(map[string]interface{}{
+ "options": map[string]interface{}{
+ "experiments": []interface{}{"exp1", "exp2"},
+ },
+ })
+
+ if !HasExperiment(options, "exp1") {
+ t.Errorf("HasExperiment(exp1) = false, want true")
+ }
+ if HasExperiment(options, "exp3") {
+ t.Errorf("HasExperiment(exp3) = true, want false")
+ }
+}
+
+func TestGetExperiments_Combined(t *testing.T) {
+ options, _ := structpb.NewStruct(map[string]interface{}{
+ "options": map[string]interface{}{
+ "experiments": []interface{}{"exp1", "exp2"},
+ },
+ "beam:option:experiments:v1": []interface{}{"expA", "expB"},
+ })
+ exps := GetExperiments(options)
+ if len(exps) != 4 || exps[0] != "exp1" || exps[1] != "exp2" || exps[2]
!= "expA" || exps[3] != "expB" {
+ t.Errorf("GetExperiments() = %v, want [exp1 exp2 expA expB]",
exps)
+ }
+}
diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go
index 8c918f23179..3ce79e4927e 100644
--- a/sdks/java/container/boot.go
+++ b/sdks/java/container/boot.go
@@ -105,6 +105,9 @@ func main() {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v",
err)
}
+ // Inject artifact validation enabled state into context
+ ctx = artifact.WithArtifactValidation(ctx,
!artifact.HasExperiment(info.GetPipelineOptions(),
"disable_staged_file_integrity_checks"))
+
// (2) Retrieve the staged user jars. We ignore any disk limit,
// because the staged jars are mandatory.
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index f38a2ee34bb..097523a5131 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -600,8 +600,9 @@ class DataflowApplicationClient(object):
else:
remote_name = os.path.basename(type_payload.path)
is_staged_role = False
-
- if self._enable_caching and not type_payload.sha256:
+ # compute sha256 even if caching is disabled.
+ # This is used to check the payload integrity along with caching.
+ if not type_payload.sha256:
type_payload.sha256 = self._compute_sha256(type_payload.path)
if type_payload.sha256 and type_payload.sha256 in staged_hashes:
diff --git
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 43f4c8a2151..43f51d0b39f 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -1375,13 +1375,19 @@ class UtilTest(unittest.TestCase):
])
}))
client = apiclient.DataflowApplicationClient(pipeline_options)
- with mock.patch.object(apiclient._LegacyDataflowStager,
- 'stage_job_resources') as mock_stager:
- client._stage_resources(pipeline, pipeline_options)
+ with mock.patch.object(apiclient.DataflowApplicationClient,
+ '_compute_sha256',
+ side_effect=lambda path: 'hash' + path):
+ with mock.patch.object(apiclient._LegacyDataflowStager,
+ 'stage_job_resources') as mock_stager:
+ client._stage_resources(pipeline, pipeline_options)
mock_stager.assert_called_once_with(
- [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''),
- ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'),
- ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')],
+ [('/tmp/foo1', 'foo1', 'hash/tmp/foo1'),
+ ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'),
+ ('/tmp/baz', 'baz1', 'hash/tmp/baz'),
+ ('/tmp/renamed1', 'renamed1', 'abcdefg'),
+ ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'),
+ ('/tmp/bar2', 'bar2', 'hash/tmp/bar2')],
staging_location='gs://test-location/staging')
pipeline_expected = beam_runner_api_pb2.Pipeline(
@@ -1392,8 +1398,8 @@ class UtilTest(unittest.TestCase):
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.URL.urn,
type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
- url='gs://test-location/staging/foo1'
- ).SerializeToString(),
+ url='gs://test-location/staging/foo1',
+ sha256='hash/tmp/foo1').SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.
ArtifactStagingToRolePayload(
@@ -1401,8 +1407,8 @@ class UtilTest(unittest.TestCase):
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.URL.urn,
type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
- url='gs://test-location/staging/bar1').
- SerializeToString(),
+ url='gs://test-location/staging/bar1',
+ sha256='hash/tmp/bar1').SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.
ArtifactStagingToRolePayload(
@@ -1410,8 +1416,8 @@ class UtilTest(unittest.TestCase):
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.URL.urn,
type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
- url='gs://test-location/staging/baz1').
- SerializeToString(),
+ url='gs://test-location/staging/baz1',
+ sha256='hash/tmp/baz').SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.
ArtifactStagingToRolePayload(
@@ -1431,8 +1437,8 @@ class UtilTest(unittest.TestCase):
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.URL.urn,
type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
- url='gs://test-location/staging/foo2').
- SerializeToString(),
+ url='gs://test-location/staging/foo2',
+ sha256='hash/tmp/foo2').SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.
ArtifactStagingToRolePayload(
@@ -1440,8 +1446,8 @@ class UtilTest(unittest.TestCase):
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.URL.urn,
type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
- url='gs://test-location/staging/bar2').
- SerializeToString(),
+ url='gs://test-location/staging/bar2',
+ sha256='hash/tmp/bar2').SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.
ArtifactStagingToRolePayload(
@@ -1449,8 +1455,8 @@ class UtilTest(unittest.TestCase):
beam_runner_api_pb2.ArtifactInformation(
type_urn=common_urns.artifact_types.URL.urn,
type_payload=beam_runner_api_pb2.ArtifactUrlPayload(
- url='gs://test-location/staging/baz1').
- SerializeToString(),
+ url='gs://test-location/staging/baz1',
+ sha256='hash/tmp/baz').SerializeToString(),
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
role_payload=beam_runner_api_pb2.
ArtifactStagingToRolePayload(
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index a2655903a4b..f5a37c9cca0 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -184,6 +184,9 @@ func launchSDKProcess() error {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v",
err)
}
+ // Inject artifact validation enabled state into context
+ ctx = artifact.WithArtifactValidation(ctx,
!artifact.HasExperiment(info.GetPipelineOptions(),
"disable_staged_file_integrity_checks"))
+
experiments := getExperiments(options)
logger.Printf(ctx, "Experiments=%v", experiments)
diff --git a/sdks/typescript/container/boot.go
b/sdks/typescript/container/boot.go
index 44f94f80433..95e26124fac 100644
--- a/sdks/typescript/container/boot.go
+++ b/sdks/typescript/container/boot.go
@@ -91,6 +91,9 @@ func main() {
logger.Fatalf(ctx, "Failed to convert pipeline options: %v",
err)
}
+ // Inject artifact validation enabled state into context
+ ctx = artifact.WithArtifactValidation(ctx,
!artifact.HasExperiment(info.GetPipelineOptions(),
"disable_staged_file_integrity_checks"))
+
// (2) Retrieve and install the staged packages.
dir := filepath.Join(*semiPersistDir, *id, "staged")