This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 01c1cfe [BEAM-5496] Fixes bug of MqttIO fails to deserialize
checkpoint (#6701)
01c1cfe is described below
commit 01c1cfef6abf72ac1618684f5c83ffd9699764fc
Author: flyisland <[email protected]>
AuthorDate: Thu Oct 25 09:48:17 2018 +0800
[BEAM-5496] Fixes bug of MqttIO fails to deserialize checkpoint (#6701)
---
.../java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 29 +++++++++++++++++++---
.../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 24 ++++++++++++++++++
2 files changed, 50 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 2d7734e..cc9c8ad 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -305,12 +306,16 @@ public class MqttIO {
@VisibleForTesting
static class MqttCheckpointMark implements UnboundedSource.CheckpointMark,
Serializable {
- private String clientId;
- private Instant oldestMessageTimestamp = Instant.now();
- private transient List<Message> messages = new ArrayList<>();
+ @VisibleForTesting String clientId;
+ @VisibleForTesting Instant oldestMessageTimestamp = Instant.now();
+ @VisibleForTesting transient List<Message> messages = new ArrayList<>();
public MqttCheckpointMark() {}
+ public MqttCheckpointMark(String id) {
+ clientId = id;
+ }
+
public void add(Message message, Instant timestamp) {
if (timestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = timestamp;
@@ -335,8 +340,26 @@ public class MqttIO {
// set an empty list to messages when deserialize
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
+ stream.defaultReadObject();
messages = new ArrayList<>();
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof MqttCheckpointMark) {
+ MqttCheckpointMark that = (MqttCheckpointMark) other;
+ return Objects.equals(this.clientId, that.clientId)
+ && Objects.equals(this.oldestMessageTimestamp,
that.oldestMessageTimestamp)
+ && Objects.deepEquals(this.messages, that.messages);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clientId, oldestMessageTimestamp, messages);
+ }
}
@VisibleForTesting
diff --git
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 05c07c9..b4fd2da 100644
---
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -20,10 +20,15 @@ package org.apache.beam.sdk.io.mqtt;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
@@ -263,6 +268,25 @@ public class MqttIOTest {
}
}
+ @Test
+ public void testReadObject() throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos);
+ MqttIO.MqttCheckpointMark cp1 = new
MqttIO.MqttCheckpointMark(UUID.randomUUID().toString());
+ out.writeObject(cp1);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ ObjectInputStream in = new ObjectInputStream(bis);
+ MqttIO.MqttCheckpointMark cp2 = (MqttIO.MqttCheckpointMark)
in.readObject();
+
+ // there should be no bytes left in the stream
+ assertEquals(0, in.available());
+ // the number of messages of the decoded checkpoint should be zero
+ assertEquals(0, cp2.messages.size());
+ assertEquals(cp1.clientId, cp2.clientId);
+ assertEquals(cp1.oldestMessageTimestamp, cp2.oldestMessageTimestamp);
+ }
+
@After
public void stopBroker() throws Exception {
if (brokerService != null) {