This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/combineValuesJava
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 55b9432b41aafbe385418a1a2bd62f3d498be712
Author: Danny Mccormick <[email protected]>
AuthorDate: Mon Oct 6 10:15:09 2025 -0400

    [WIP] CombinePerKey with gbek (Java)
---
 .github/trigger_files/beam_PostCommit_Java.json    |  2 +-
 .../beam_PostCommit_Java_DataflowV1.json           |  2 +-
 .../beam_PostCommit_Java_DataflowV2.json           |  2 +-
 .../apache/beam/sdk/transforms/GroupByKeyIT.java   | 48 ++++++++++++++++++++++
 4 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Java.json 
b/.github/trigger_files/beam_PostCommit_Java.json
index 8784d0786c0..53d94cfc4f1 100644
--- a/.github/trigger_files/beam_PostCommit_Java.json
+++ b/.github/trigger_files/beam_PostCommit_Java.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 2
+  "modification": 3
 }
\ No newline at end of file
diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json 
b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json
index 42fb8f985ba..78b2bdb93e2 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json
@@ -3,6 +3,6 @@
   "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
   "https://github.com/apache/beam/pull/35177": "Introducing 
WindowedValueReceiver to runners",
     "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-    "modification": 2,
+    "modification": 3,
   "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface"
 }
diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json 
b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
index 3717f48ee49..1c13e3f07b2 100644
--- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
+++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json
@@ -1,4 +1,4 @@
 {
-  "modification": 4,
+  "modification": 5,
   "https://github.com/apache/beam/pull/35159": "moving WindowedValue and 
making an interface"
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
index d1fa43be285..cbbc5d11f04 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java
@@ -189,4 +189,52 @@ public class GroupByKeyIT {
     thrown.expect(RuntimeException.class);
     p.run();
   }
+
+  // Combine.PerKey depends on GBK under the hood, but can be overriden by a 
runner. This can
+  // fail unless it is handled specially, so we should test it specifically
+  @Test
+  public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
+    if (gcpSecretVersionName == null) {
+      // Skip test if we couldn't set up secret manager
+      return;
+    }
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setGBEK(String.format("type:gcpsecret;version_name:%s", 
gcpSecretVersionName));
+    Pipeline p = Pipeline.create(options);
+
+    List<KV<String, Integer>> ungroupedPairs =
+        Arrays.asList(
+            KV.of("k1", 3),
+            KV.of("k2", 66),
+            KV.of("k1", 4),
+            KV.of("k2", -33),
+            KV.of("k3", 0));
+    List<KV<String, Integer>> sums =
+        Arrays.asList(
+            KV.of("k1", 7),
+            KV.of("k2", 33),
+            KV.of("k3", 0));
+    PCollection<KV<String, Integer>> input =
+        p.apply(
+            Create.of(ungroupedPairs)
+                .withCoder(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of())));
+    PCollection<KV<String, Integer>> output = 
input.apply(Combine.perKey(Sum.ofIntegers()));
+    PAssert.that(output).containsInAnyOrder(sums);
+
+    p.run();
+  }
+
+  @Test
+  public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
+    if (gcpSecretVersionName == null) {
+      // Skip test if we couldn't set up secret manager
+      return;
+    }
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.setGBEK("type:gcpsecret;version_name:bad_path/versions/latest");
+    Pipeline p = Pipeline.create(options);
+    p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
+    thrown.expect(RuntimeException.class);
+    p.run();
+  }
 }

Reply via email to