zhoufek commented on a change in pull request #14971:
URL: https://github.com/apache/beam/pull/14971#discussion_r649473094



##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
##########
@@ -416,6 +423,96 @@ public void after() throws IOException {
     }
   }
 
+  @Test
+  public void testProto() {
+    ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+    ImmutableList<Primitive> inputs =
+        ImmutableList.of(
+            Primitive.newBuilder().setPrimitiveInt32(42).build(),
+            Primitive.newBuilder().setPrimitiveBool(true).build(),
+            Primitive.newBuilder().setPrimitiveString("Hello, 
World!").build());
+    setupTestClient(inputs, coder);
+    PCollection<Primitive> read =
+        readPipeline.apply(
+            PubsubIO.readProtos(Primitive.class)
+                .fromSubscription(SUBSCRIPTION.getPath())
+                .withClock(CLOCK)
+                .withClientFactory(clientFactory));
+    PAssert.that(read).containsInAnyOrder(inputs);
+    readPipeline.run();
+  }
+
+  @Test
+  public void testProtoDynamicMessage() {
+    ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+    ImmutableList<Primitive> inputs =
+        ImmutableList.of(
+            Primitive.newBuilder().setPrimitiveInt32(42).build(),
+            Primitive.newBuilder().setPrimitiveBool(true).build(),
+            Primitive.newBuilder().setPrimitiveString("Hello, 
World!").build());
+    setupTestClient(inputs, coder);
+
+    ProtoDomain domain = ProtoDomain.buildFrom(Primitive.getDescriptor());
+    String name = Primitive.getDescriptor().getFullName();
+    PCollection<Primitive> read =
+        readPipeline
+            .apply(
+                PubsubIO.readProtoDynamicMessage(domain, name)
+                    .fromSubscription(SUBSCRIPTION.getPath())
+                    .withClock(CLOCK)
+                    .withClientFactory(clientFactory))
+            // DynamicMessage doesn't work well with PAssert, but if the 
content can be successfully
+            // converted back into the original Primitive, then that should be 
good enough to
+            // consider it a successful read.
+            .apply(
+                "Return To Primitive",
+                MapElements.into(TypeDescriptor.of(Primitive.class))
+                    .via(
+                        (DynamicMessage message) -> {

Review comment:
       I think that, since this is just a test, it would be acceptable to not 
catch the exception. Whether we fail by not catching an except or by asserting 
the failures are empty when they aren't, the end result is essentially the same.
   
   Edit: But thanks for pointing out those exception methods for `MapElements`! 
They should be helpful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to