Fixed adding timestamp and id attributes to pubsub messages
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f77a8580 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f77a8580 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f77a8580 Branch: refs/heads/tez-runner Commit: f77a8580700b1cd79e0c824f171cccd8e2f332b3 Parents: 30886ac Author: Nigel Kilmer <nkil...@google.com> Authored: Mon Jul 17 18:09:57 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Wed Nov 15 15:44:23 2017 -0800 ---------------------------------------------------------------------- .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 34 ++++++------- .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 53 ++++++++++++++++++++ 2 files changed, 70 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index b745422..ab08813 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -135,23 +135,7 @@ class PubsubJsonClient extends PubsubClient { List<PubsubMessage> pubsubMessages = new ArrayList<>(outgoingMessages.size()); for (OutgoingMessage outgoingMessage : outgoingMessages) { PubsubMessage pubsubMessage = new PubsubMessage().encodeData(outgoingMessage.elementBytes); - - Map<String, String> attributes = outgoingMessage.attributes; - if ((timestampAttribute != null || idAttribute != null) && attributes == null) { - attributes = new TreeMap<>(); - } - if (attributes != null) { - pubsubMessage.setAttributes(attributes); - } - - if (timestampAttribute != null) { - attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); - } - - if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { - attributes.put(idAttribute, outgoingMessage.recordId); - } - + pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage)); pubsubMessages.add(pubsubMessage); } PublishRequest request = new PublishRequest().setMessages(pubsubMessages); @@ -162,6 +146,22 @@ class PubsubJsonClient extends PubsubClient { return response.getMessageIds().size(); } + private Map<String, String> getMessageAttributes(OutgoingMessage outgoingMessage) { + Map<String, String> attributes = null; + if (outgoingMessage.attributes == null) { + attributes = new TreeMap<>(); + } else { + attributes = new TreeMap<>(outgoingMessage.attributes); + } + if (timestampAttribute != null) { + attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch)); + } + if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId)) { + attributes.put(idAttribute, outgoingMessage.recordId); + } + return attributes; + } + @Override public List<IncomingMessage> pull( long requestTimeMsSinceEpoch, http://git-wip-us.apache.org/repos/asf/beam/blob/f77a8580/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 578f814..cbb24f2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -136,4 +136,57 @@ public class PubsubJsonClientTest { int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } + + @Test + public void publishOneMessageWithOnlyTimestampAndIdAttributes() throws IOException { + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .encodeData(DATA.getBytes()) + .setAttributes( + ImmutableMap.<String, String> builder() + .put(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME)) + .put(ID_ATTRIBUTE, RECORD_ID).build()); + PublishRequest expectedRequest = new PublishRequest() + .setMessages(ImmutableList.of(expectedPubsubMessage)); + PublishResponse expectedResponse = new PublishResponse() + .setMessageIds(ImmutableList.of(MESSAGE_ID)); + Mockito.when((Object) (mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute())) + .thenReturn(expectedResponse); + OutgoingMessage actualMessage = new OutgoingMessage( + DATA.getBytes(), ImmutableMap.<String, String>of(), MESSAGE_TIME, RECORD_ID); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } + + @Test + public void publishOneMessageWithNoTimestampOrIdAttribute() throws IOException { + // For this test, create a new PubsubJsonClient without the timestamp attribute + // or id attribute set. + client = new PubsubJsonClient(null, null, mockPubsub); + + String expectedTopic = TOPIC.getPath(); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .encodeData(DATA.getBytes()) + .setAttributes( + ImmutableMap.<String, String> builder() + .put("k", "v").build()); + PublishRequest expectedRequest = new PublishRequest() + .setMessages(ImmutableList.of(expectedPubsubMessage)); + PublishResponse expectedResponse = new PublishResponse() + .setMessageIds(ImmutableList.of(MESSAGE_ID)); + Mockito.when((Object) (mockPubsub.projects() + .topics() + .publish(expectedTopic, expectedRequest) + .execute())) + .thenReturn(expectedResponse); + Map<String, String> attrs = new HashMap<>(); + attrs.put("k", "v"); + OutgoingMessage actualMessage = new OutgoingMessage( + DATA.getBytes(), attrs, MESSAGE_TIME, RECORD_ID); + int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); + assertEquals(1, n); + } }