This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 104c10b3ee5 [RRIO] Create Caller and SetupTeardown interfaces (#28905)
104c10b3ee5 is described below

commit 104c10b3ee536a9a3ea52b4dbf62d86b669da5d9
Author: Damon <damondoug...@users.noreply.github.com>
AuthorDate: Wed Oct 11 12:39:38 2023 -0700

    [RRIO] Create Caller and SetupTeardown interfaces (#28905)
    
    * Create test Caller and SetupTeardown interfaces
    
    * Update Javadoc
    
    * Defer Call transform to future PR
    
    * Rename package to requestresponseio
    
    * Add username to TODO
---
 sdks/java/io/rrio/build.gradle                     |   7 +-
 .../apache/beam/io/requestresponseio/Caller.java   |  27 +++++
 .../beam/io/requestresponseio/SetupTeardown.java   |  34 ++++++
 .../UserCodeExecutionException.java                |  38 ++++++
 .../requestresponseio/UserCodeQuotaException.java  |  42 +++++++
 .../UserCodeTimeoutException.java                  |  39 ++++++
 .../beam/io/requestresponseio/package-info.java    |  20 ++++
 .../java/org/apache/beam/io/rrio/CallerTest.java   | 126 ++++++++++++++++++++
 .../org/apache/beam/io/rrio/SetupTeardownTest.java | 132 +++++++++++++++++++++
 9 files changed, 462 insertions(+), 3 deletions(-)

diff --git a/sdks/java/io/rrio/build.gradle b/sdks/java/io/rrio/build.gradle
index d65df370e0c..32fbd9d22e3 100644
--- a/sdks/java/io/rrio/build.gradle
+++ b/sdks/java/io/rrio/build.gradle
@@ -25,9 +25,10 @@ description = "Apache Beam :: SDKS :: Java :: IO :: 
RequestResponseIO (RRIO)"
 ext.summary = "Support to read from and write to Web APIs"
 
 dependencies {
-    implementation project(path: ":sdks:java:core", configuration: "shadow")
-    implementation library.java.joda_time
-    implementation library.java.vendored_guava_32_1_2_jre
+    // TODO(damondouglas): revert to implementation after project is more 
fully developed
+    permitUnusedDeclared project(path: ":sdks:java:core", configuration: 
"shadow")
+    permitUnusedDeclared library.java.joda_time
+    permitUnusedDeclared library.java.vendored_guava_32_1_2_jre
 
     testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
     testImplementation library.java.junit
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java
new file mode 100644
index 00000000000..32b514c43a1
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/Caller.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import java.io.Serializable;
+
+/** {@link Caller} interfaces user custom code intended for API calls. */
+public interface Caller<RequestT, ResponseT> extends Serializable {
+
+  /** Calls a Web API with the {@link RequestT} and returns a {@link 
ResponseT}. */
+  ResponseT call(RequestT request) throws UserCodeExecutionException;
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java
new file mode 100644
index 00000000000..2bdc8113d98
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/SetupTeardown.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import java.io.Serializable;
+
+/**
+ * Provided by user and called within {@link 
org.apache.beam.sdk.transforms.DoFn.Setup} and @{link
+ * org.apache.beam.sdk.transforms.DoFn.Teardown} lifecycle methods of {@link 
Call}'s {@link
+ * org.apache.beam.sdk.transforms.DoFn}.
+ */
+public interface SetupTeardown extends Serializable {
+
+  /** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s setup 
lifecycle method. */
+  void setup() throws UserCodeExecutionException;
+
+  /** Called during the {@link org.apache.beam.sdk.transforms.DoFn}'s teardown 
lifecycle method. */
+  void teardown() throws UserCodeExecutionException;
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java
new file mode 100644
index 00000000000..3a4c002f52e
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeExecutionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+/** Base {@link Exception} for signaling errors in user custom code. */
+public class UserCodeExecutionException extends Exception {
+  public UserCodeExecutionException(String message) {
+    super(message);
+  }
+
+  public UserCodeExecutionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public UserCodeExecutionException(Throwable cause) {
+    super(cause);
+  }
+
+  public UserCodeExecutionException(
+      String message, Throwable cause, boolean enableSuppression, boolean 
writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java
new file mode 100644
index 00000000000..f16f078927f
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeQuotaException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+/**
+ * Extends {@link UserCodeQuotaException} to allow the user custom code to 
specifically signal a
+ * Quota or API overuse related error.
+ */
+public class UserCodeQuotaException extends UserCodeExecutionException {
+
+  public UserCodeQuotaException(String message) {
+    super(message);
+  }
+
+  public UserCodeQuotaException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public UserCodeQuotaException(Throwable cause) {
+    super(cause);
+  }
+
+  public UserCodeQuotaException(
+      String message, Throwable cause, boolean enableSuppression, boolean 
writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java
new file mode 100644
index 00000000000..22b06744985
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/UserCodeTimeoutException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+/** An extension of {@link UserCodeQuotaException} to specifically signal a 
user code timeout. */
+public class UserCodeTimeoutException extends UserCodeExecutionException {
+
+  public UserCodeTimeoutException(String message) {
+    super(message);
+  }
+
+  public UserCodeTimeoutException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public UserCodeTimeoutException(Throwable cause) {
+    super(cause);
+  }
+
+  public UserCodeTimeoutException(
+      String message, Throwable cause, boolean enableSuppression, boolean 
writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java
new file mode 100644
index 00000000000..cd9c11c13f8
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Package provides Beam I/O transform support for safely reading from and 
writing to Web APIs. */
+package org.apache.beam.io.requestresponseio;
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java
new file mode 100644
index 00000000000..5258573f428
--- /dev/null
+++ b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/CallerTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.rrio;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.io.requestresponseio.Caller;
+import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
+import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
+import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Caller}. */
+@RunWith(JUnit4.class)
+public class CallerTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void canSerializeImplementingClasses() {
+    SerializableUtils.serializeToByteArray(new CallerImpl());
+  }
+
+  @Test
+  public void canSerializeWhenUsedInDoFn() {
+    pipeline
+        .apply(Create.of(Instant.now()))
+        .apply(ParDo.of(new CallerUsingDoFn<>(new CallerImpl())))
+        .setCoder(StringUtf8Coder.of());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void canSignalQuotaException() {
+    pipeline
+        .apply(Create.of(1))
+        .apply(ParDo.of(new CallerUsingDoFn<>(new 
CallerThrowsQuotaException())))
+        .setCoder(VarIntCoder.of());
+
+    PipelineExecutionException executionException =
+        assertThrows(PipelineExecutionException.class, pipeline::run);
+    assertEquals(UserCodeQuotaException.class, 
executionException.getCause().getClass());
+  }
+
+  @Test
+  public void canSignalTimeoutException() {
+    pipeline
+        .apply(Create.of(1))
+        .apply(ParDo.of(new CallerUsingDoFn<>(new 
CallerThrowsTimeoutException())))
+        .setCoder(VarIntCoder.of());
+
+    PipelineExecutionException executionException =
+        assertThrows(PipelineExecutionException.class, pipeline::run);
+    assertEquals(UserCodeTimeoutException.class, 
executionException.getCause().getClass());
+  }
+
+  private static class CallerUsingDoFn<RequestT, ResponseT> extends 
DoFn<RequestT, ResponseT> {
+    private final Caller<RequestT, ResponseT> caller;
+
+    private CallerUsingDoFn(Caller<RequestT, ResponseT> caller) {
+      this.caller = caller;
+    }
+
+    @ProcessElement
+    public void process(@Element RequestT request, OutputReceiver<ResponseT> 
receiver)
+        throws UserCodeExecutionException {
+      RequestT safeRequest = checkStateNotNull(request);
+      ResponseT response = caller.call(safeRequest);
+      receiver.output(response);
+    }
+  }
+
+  private static class CallerImpl implements Caller<Instant, String> {
+
+    @Override
+    public String call(Instant request) throws UserCodeExecutionException {
+      return request.toString();
+    }
+  }
+
+  private static class CallerThrowsQuotaException implements Caller<Integer, 
Integer> {
+
+    @Override
+    public Integer call(Integer request) throws UserCodeExecutionException {
+      throw new UserCodeQuotaException("quota");
+    }
+  }
+
+  private static class CallerThrowsTimeoutException implements Caller<Integer, 
Integer> {
+
+    @Override
+    public Integer call(Integer request) throws UserCodeExecutionException {
+      throw new UserCodeTimeoutException("timeout");
+    }
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java
new file mode 100644
index 00000000000..a8c5c45ede5
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/rrio/SetupTeardownTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.rrio;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.io.requestresponseio.SetupTeardown;
+import org.apache.beam.io.requestresponseio.UserCodeExecutionException;
+import org.apache.beam.io.requestresponseio.UserCodeQuotaException;
+import org.apache.beam.io.requestresponseio.UserCodeTimeoutException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class SetupTeardownTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void canSerializeImplementingClasses() {
+    SerializableUtils.serializeToByteArray(new SetupTeardownImpl());
+  }
+
+  @Test
+  public void canSerializeWhenUsedInDoFn() {
+    pipeline
+        .apply(Create.of(1))
+        .apply(ParDo.of(new SetupTeardownUsingDoFn(new SetupTeardownImpl())))
+        .setCoder(VarIntCoder.of());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void canSignalQuotaException() {
+    pipeline
+        .apply(Create.of(1))
+        .apply(ParDo.of(new SetupTeardownUsingDoFn(new 
ThrowsQuotaException())))
+        .setCoder(VarIntCoder.of());
+
+    UncheckedExecutionException exception =
+        assertThrows(UncheckedExecutionException.class, pipeline::run);
+    UserCodeException userCodeException = (UserCodeException) 
exception.getCause();
+    assertEquals(UserCodeQuotaException.class, 
userCodeException.getCause().getClass());
+  }
+
+  @Test
+  public void canSignalTimeoutException() {
+    pipeline
+        .apply(Create.of(1))
+        .apply(ParDo.of(new SetupTeardownUsingDoFn(new 
ThrowsTimeoutException())))
+        .setCoder(VarIntCoder.of());
+
+    UncheckedExecutionException exception =
+        assertThrows(UncheckedExecutionException.class, pipeline::run);
+    UserCodeException userCodeException = (UserCodeException) 
exception.getCause();
+    assertEquals(UserCodeTimeoutException.class, 
userCodeException.getCause().getClass());
+  }
+
+  private static class SetupTeardownUsingDoFn extends DoFn<Integer, Integer> {
+    private final SetupTeardown setupTeardown;
+
+    private SetupTeardownUsingDoFn(SetupTeardown setupTeardown) {
+      this.setupTeardown = setupTeardown;
+    }
+
+    @Setup
+    public void setup() throws UserCodeExecutionException {
+      setupTeardown.setup();
+    }
+
+    @Teardown
+    public void teardown() throws UserCodeExecutionException {
+      setupTeardown.teardown();
+    }
+
+    @ProcessElement
+    public void process() {}
+  }
+
+  private static class SetupTeardownImpl implements SetupTeardown {
+    @Override
+    public void setup() throws UserCodeExecutionException {}
+
+    @Override
+    public void teardown() throws UserCodeExecutionException {}
+  }
+
+  private static class ThrowsQuotaException implements SetupTeardown {
+
+    @Override
+    public void setup() throws UserCodeExecutionException {
+      throw new UserCodeQuotaException("quota");
+    }
+
+    @Override
+    public void teardown() throws UserCodeExecutionException {}
+  }
+
+  private static class ThrowsTimeoutException implements SetupTeardown {
+
+    @Override
+    public void setup() throws UserCodeExecutionException {
+      throw new UserCodeTimeoutException("timeout");
+    }
+
+    @Override
+    public void teardown() throws UserCodeExecutionException {}
+  }
+}

Reply via email to