This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/combineValuesJava in repository https://gitbox.apache.org/repos/asf/beam.git
commit 55b9432b41aafbe385418a1a2bd62f3d498be712 Author: Danny Mccormick <[email protected]> AuthorDate: Mon Oct 6 10:15:09 2025 -0400 [WIP] CombinePerKey with gbek (Java) --- .github/trigger_files/beam_PostCommit_Java.json | 2 +- .../beam_PostCommit_Java_DataflowV1.json | 2 +- .../beam_PostCommit_Java_DataflowV2.json | 2 +- .../apache/beam/sdk/transforms/GroupByKeyIT.java | 48 ++++++++++++++++++++++ 4 files changed, 51 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 8784d0786c0..53d94cfc4f1 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index 42fb8f985ba..78b2bdb93e2 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -3,6 +3,6 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2, + "modification": 3, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 3717f48ee49..1c13e3f07b2 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,4 @@ { - "modification": 4, + "modification": 5, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index d1fa43be285..cbbc5d11f04 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -189,4 +189,52 @@ public class GroupByKeyIT { thrown.expect(RuntimeException.class); p.run(); } + + // Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can + // fail unless it is handled specially, so we should test it specifically + @Test + public void testCombinePerKeyWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + + List<KV<String, Integer>> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + List<KV<String, Integer>> sums = + Arrays.asList( + KV.of("k1", 7), + KV.of("k2", 33), + KV.of("k3", 0)); + PCollection<KV<String, Integer>> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + PCollection<KV<String, Integer>> output = input.apply(Combine.perKey(Sum.ofIntegers())); + PAssert.that(output).containsInAnyOrder(sums); + + p.run(); + } + + @Test + public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers())); + thrown.expect(RuntimeException.class); + p.run(); + } }
