kennknowles commented on a change in pull request #14971:
URL: https://github.com/apache/beam/pull/14971#discussion_r648422185
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java
##########
@@ -416,6 +423,96 @@ public void after() throws IOException {
}
}
+ @Test
+ public void testProto() {
+ ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+ ImmutableList<Primitive> inputs =
+ ImmutableList.of(
+ Primitive.newBuilder().setPrimitiveInt32(42).build(),
+ Primitive.newBuilder().setPrimitiveBool(true).build(),
+ Primitive.newBuilder().setPrimitiveString("Hello,
World!").build());
+ setupTestClient(inputs, coder);
+ PCollection<Primitive> read =
+ readPipeline.apply(
+ PubsubIO.readProtos(Primitive.class)
+ .fromSubscription(SUBSCRIPTION.getPath())
+ .withClock(CLOCK)
+ .withClientFactory(clientFactory));
+ PAssert.that(read).containsInAnyOrder(inputs);
+ readPipeline.run();
+ }
+
+ @Test
+ public void testProtoDynamicMessage() {
+ ProtoCoder<Primitive> coder = ProtoCoder.of(Primitive.class);
+ ImmutableList<Primitive> inputs =
+ ImmutableList.of(
+ Primitive.newBuilder().setPrimitiveInt32(42).build(),
+ Primitive.newBuilder().setPrimitiveBool(true).build(),
+ Primitive.newBuilder().setPrimitiveString("Hello,
World!").build());
+ setupTestClient(inputs, coder);
+
+ ProtoDomain domain = ProtoDomain.buildFrom(Primitive.getDescriptor());
+ String name = Primitive.getDescriptor().getFullName();
+ PCollection<Primitive> read =
+ readPipeline
+ .apply(
+ PubsubIO.readProtoDynamicMessage(domain, name)
+ .fromSubscription(SUBSCRIPTION.getPath())
+ .withClock(CLOCK)
+ .withClientFactory(clientFactory))
+ // DynamicMessage doesn't work well with PAssert, but if the
content can be successfully
+ // converted back into the original Primitive, then that should be
good enough to
+ // consider it a successful read.
+ .apply(
+ "Return To Primitive",
+ MapElements.into(TypeDescriptor.of(Primitive.class))
+ .via(
+ (DynamicMessage message) -> {
Review comment:
This boilerplate makes me think that `MapElements` could also benefit
from a variant with an output for failed messages. (not actionable for your PR,
just a comment)
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -483,6 +488,54 @@ public String toString() {
return
Read.newBuilder(parsePayloadUsingCoder(coder)).setCoder(coder).build();
}
+ /**
+ * Returns a {@link PTransform} that continuously reads binary encoded
protobuf messages for the
+ * type specified by {@code fullMessageName}.
+ *
+ * <p>This is primarily here for cases where the message type cannot be
known at compile time. If
+ * it can be known, prefer {@link PubsubIO#readProtos(Class)}, as {@link
DynamicMessage} tends to
+ * perform worse than concrete types.
+ *
+ * <p>Beam will infer a schema for the {@link DynamicMessage} schema. Note
that some proto schema
+ * features are not supported by all sinks.
+ *
+ * @param domain The {@link ProtoDomain} that contains the target message
and its dependencies.
+ * @param fullMessageName The full name of the message for lookup in {@code
domain}.
+ */
+ @Experimental(Kind.SCHEMAS)
+ public static Read<DynamicMessage> readProtoDynamicMessage(
+ ProtoDomain domain, String fullMessageName) {
+ SerializableFunction<PubsubMessage, DynamicMessage> parser =
+ message -> {
+ try {
+ return DynamicMessage.parseFrom(
+ domain.getDescriptor(fullMessageName), message.getPayload());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Could not parse Pub/Sub message", e);
Review comment:
Does PubsubIO have a dead-letter capability that will catch this error
and output unparseable messages to some secondary output? Can users turn that
capability on after the `Read` transform is returned by this method?
(I'm not opening up the code and reading, because I just want to get your
review back to you and I'm sure you know the answer and will do the right thing)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]