This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ebdf6ba  Fixes null pointer exception on empty pubsub message data 
(#5016)
ebdf6ba is described below

commit ebdf6ba5f459cea5c1fe20c1df9bebc4df9cfeb5
Author: Matthew Jones <mle...@gmail.com>
AuthorDate: Wed Jun 20 11:41:35 2018 +1200

    Fixes null pointer exception on empty pubsub message data (#5016)
---
 .../beam/sdk/io/gcp/pubsub/PubsubJsonClient.java   |  3 +++
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java    | 28 ++++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 1981787..b67bb43 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -185,6 +185,9 @@ public class PubsubJsonClient extends PubsubClient {
 
       // Payload.
       byte[] elementBytes = pubsubMessage.decodeData();
+      if (elementBytes == null) {
+        elementBytes = new byte[0];
+      }
 
       // Timestamp.
       long timestampMsSinceEpoch =
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index 2922057..efd9de0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.io.gcp.pubsub;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
 import com.google.api.services.pubsub.Pubsub;
@@ -112,6 +113,33 @@ public class PubsubJsonClientTest {
   }
 
   @Test
+  public void pullOneMessageWithNoData() throws IOException {
+    String expectedSubscription = SUBSCRIPTION.getPath();
+    PullRequest expectedRequest =
+            new PullRequest().setReturnImmediately(true).setMaxMessages(10);
+    PubsubMessage expectedPubsubMessage = new PubsubMessage()
+            .setMessageId(MESSAGE_ID)
+            .setPublishTime(String.valueOf(PUB_TIME))
+            .setAttributes(
+                    ImmutableMap.of(TIMESTAMP_ATTRIBUTE, 
String.valueOf(MESSAGE_TIME),
+                            ID_ATTRIBUTE, RECORD_ID));
+    ReceivedMessage expectedReceivedMessage =
+            new ReceivedMessage().setMessage(expectedPubsubMessage)
+                    .setAckId(ACK_ID);
+    PullResponse expectedResponse =
+            new 
PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
+    Mockito.when((Object) (mockPubsub.projects()
+            .subscriptions()
+            .pull(expectedSubscription, expectedRequest)
+            .execute()))
+            .thenReturn(expectedResponse);
+    List<IncomingMessage> acutalMessages = client.pull(REQ_TIME, SUBSCRIPTION, 
10, true);
+    assertEquals(1, acutalMessages.size());
+    IncomingMessage actualMessage = acutalMessages.get(0);
+    assertArrayEquals(new byte[0], actualMessage.elementBytes);
+  }
+
+  @Test
   public void publishOneMessage() throws IOException {
     String expectedTopic = TOPIC.getPath();
     PubsubMessage expectedPubsubMessage = new PubsubMessage()

Reply via email to