This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ebdf6ba Fixes null pointer exception on empty pubsub message data (#5016) ebdf6ba is described below commit ebdf6ba5f459cea5c1fe20c1df9bebc4df9cfeb5 Author: Matthew Jones <mle...@gmail.com> AuthorDate: Wed Jun 20 11:41:35 2018 +1200 Fixes null pointer exception on empty pubsub message data (#5016) --- .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java | 3 +++ .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java | 28 ++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 1981787..b67bb43 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -185,6 +185,9 @@ public class PubsubJsonClient extends PubsubClient { // Payload. byte[] elementBytes = pubsubMessage.decodeData(); + if (elementBytes == null) { + elementBytes = new byte[0]; + } // Timestamp. long timestampMsSinceEpoch = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 2922057..efd9de0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.pubsub; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.api.services.pubsub.Pubsub; @@ -112,6 +113,33 @@ public class PubsubJsonClientTest { } @Test + public void pullOneMessageWithNoData() throws IOException { + String expectedSubscription = SUBSCRIPTION.getPath(); + PullRequest expectedRequest = + new PullRequest().setReturnImmediately(true).setMaxMessages(10); + PubsubMessage expectedPubsubMessage = new PubsubMessage() + .setMessageId(MESSAGE_ID) + .setPublishTime(String.valueOf(PUB_TIME)) + .setAttributes( + ImmutableMap.of(TIMESTAMP_ATTRIBUTE, String.valueOf(MESSAGE_TIME), + ID_ATTRIBUTE, RECORD_ID)); + ReceivedMessage expectedReceivedMessage = + new ReceivedMessage().setMessage(expectedPubsubMessage) + .setAckId(ACK_ID); + PullResponse expectedResponse = + new PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage)); + Mockito.when((Object) (mockPubsub.projects() + .subscriptions() + .pull(expectedSubscription, expectedRequest) + .execute())) + .thenReturn(expectedResponse); + List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 10, true); + assertEquals(1, acutalMessages.size()); + IncomingMessage actualMessage = acutalMessages.get(0); + assertArrayEquals(new byte[0], actualMessage.elementBytes); + } + + @Test public void publishOneMessage() throws IOException { String expectedTopic = TOPIC.getPath(); PubsubMessage expectedPubsubMessage = new PubsubMessage()