This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 01c1cfe  [BEAM-5496] Fixes bug of MqttIO fails to deserialize 
checkpoint (#6701)
01c1cfe is described below

commit 01c1cfef6abf72ac1618684f5c83ffd9699764fc
Author: flyisland <[email protected]>
AuthorDate: Thu Oct 25 09:48:17 2018 +0800

    [BEAM-5496] Fixes bug of MqttIO fails to deserialize checkpoint (#6701)
---
 .../java/org/apache/beam/sdk/io/mqtt/MqttIO.java   | 29 +++++++++++++++++++---
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java    | 24 ++++++++++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 2d7734e..cc9c8ad 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
@@ -305,12 +306,16 @@ public class MqttIO {
   @VisibleForTesting
   static class MqttCheckpointMark implements UnboundedSource.CheckpointMark, 
Serializable {
 
-    private String clientId;
-    private Instant oldestMessageTimestamp = Instant.now();
-    private transient List<Message> messages = new ArrayList<>();
+    @VisibleForTesting String clientId;
+    @VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+    @VisibleForTesting transient List<Message> messages = new ArrayList<>();
 
     public MqttCheckpointMark() {}
 
+    public MqttCheckpointMark(String id) {
+      clientId = id;
+    }
+
     public void add(Message message, Instant timestamp) {
       if (timestamp.isBefore(oldestMessageTimestamp)) {
         oldestMessageTimestamp = timestamp;
@@ -335,8 +340,26 @@ public class MqttIO {
     // set an empty list to messages when deserialize
     private void readObject(java.io.ObjectInputStream stream)
         throws IOException, ClassNotFoundException {
+      stream.defaultReadObject();
       messages = new ArrayList<>();
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof MqttCheckpointMark) {
+        MqttCheckpointMark that = (MqttCheckpointMark) other;
+        return Objects.equals(this.clientId, that.clientId)
+            && Objects.equals(this.oldestMessageTimestamp, 
that.oldestMessageTimestamp)
+            && Objects.deepEquals(this.messages, that.messages);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(clientId, oldestMessageTimestamp, messages);
+    }
   }
 
   @VisibleForTesting
diff --git 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 05c07c9..b4fd2da 100644
--- 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -20,10 +20,15 @@ package org.apache.beam.sdk.io.mqtt;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.net.ServerSocket;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
@@ -263,6 +268,25 @@ public class MqttIOTest {
     }
   }
 
+  @Test
+  public void testReadObject() throws Exception {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(bos);
+    MqttIO.MqttCheckpointMark cp1 = new 
MqttIO.MqttCheckpointMark(UUID.randomUUID().toString());
+    out.writeObject(cp1);
+
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    ObjectInputStream in = new ObjectInputStream(bis);
+    MqttIO.MqttCheckpointMark cp2 = (MqttIO.MqttCheckpointMark) 
in.readObject();
+
+    // there should be no bytes left in the stream
+    assertEquals(0, in.available());
+    // the number of messages of the decoded checkpoint should be zero
+    assertEquals(0, cp2.messages.size());
+    assertEquals(cp1.clientId, cp2.clientId);
+    assertEquals(cp1.oldestMessageTimestamp, cp2.oldestMessageTimestamp);
+  }
+
   @After
   public void stopBroker() throws Exception {
     if (brokerService != null) {

Reply via email to