This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new 46291031c better handling of attributes in plumtree - make it a map new 6e7f92a29 Merge pull request #500 from atoulme/better_attributes 46291031c is described below commit 46291031cea04bf01c1639eec0f52c64372e5d0e Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Sat Jan 28 22:23:41 2023 -0800 better handling of attributes in plumtree - make it a map --- .../java/org/apache/tuweni/gossip/GossipApp.java | 2 +- .../tuweni/plumtree/servlet/GossipServletTest.java | 62 +++++++-------- .../tuweni/plumtree/servlet/GossipServlet.java | 93 ++++++++++++++++------ .../plumtree/vertx/VertxGossipServerTest.java | 18 +++-- .../tuweni/plumtree/vertx/VertxGossipServer.java | 58 +++++++++++--- .../{MessageHashing.java => MessageIdentity.java} | 10 ++- .../apache/tuweni/plumtree/MessageListener.java | 4 +- .../org/apache/tuweni/plumtree/MessageSender.java | 3 +- .../java/org/apache/tuweni/plumtree/State.java | 28 +++---- .../java/org/apache/tuweni/plumtree/StateTest.java | 23 ++++-- 10 files changed, 203 insertions(+), 98 deletions(-) diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java index 0d6509c5a..a14a80681 100644 --- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java +++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java @@ -235,6 +235,6 @@ public final class GossipApp { } public void publish(Bytes message) { - server.gossip("", message); + server.gossip(Collections.emptyMap(), message); } } diff --git a/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java b/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java index 613fbbe2f..0dc8bcb22 100644 --- a/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java +++ b/plumtree-servlet/src/integrationTest/java/org/apache/tuweni/plumtree/servlet/GossipServletTest.java @@ -21,6 +21,10 @@ import org.apache.tuweni.plumtree.EphemeralPeerRepository; import org.apache.tuweni.plumtree.MessageListener; import org.apache.tuweni.plumtree.Peer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; + import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -33,7 +37,7 @@ public class GossipServletTest { public Bytes message; @Override - public void listen(Bytes messageBody, String attributes, Peer peer) { + public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer) { message = messageBody; } } @@ -59,8 +63,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10000, + "http://127.0.0.1:10000", messageReceived1, (message, peer) -> true, null, @@ -69,8 +72,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10001, + "http://127.0.0.1:10001", messageReceived2, (message, peer) -> true, null, @@ -83,8 +85,9 @@ public class GossipServletTest { server1.start(); server2.start(); - gossipServlet.connectTo("127.0.0.1", 10001).join(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + gossipServlet.connectTo("http://127.0.0.1:10001").join(); + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef")); for (int i = 0; i < 10; i++) { Thread.sleep(500); @@ -107,8 +110,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10000, + "http://127.0.0.1:10000", messageReceived1, (message, peer) -> true, null, @@ -117,8 +119,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10001, + "http://127.0.0.1:10001", messageReceived2, (message, peer) -> true, null, @@ -127,8 +128,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10002, + "http://127.0.0.1:10002", messageReceived3, (message, peer) -> true, null, @@ -143,9 +143,10 @@ public class GossipServletTest { server2.start(); server3.start(); - gossipServlet.connectTo("127.0.0.1", 10001).join(); - gossipServlet3.connectTo("127.0.0.1", 10001).join(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + gossipServlet.connectTo("http://127.0.0.1:10001").join(); + gossipServlet3.connectTo("http://127.0.0.1:10001").join(); + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef")); for (int i = 0; i < 10; i++) { Thread.sleep(500); @@ -175,8 +176,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10000, + "http://127.0.0.1:10000", messageReceived1, (message, peer) -> true, null, @@ -185,8 +185,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10001, + "http://127.0.0.1:10001", messageReceived2, (message, peer) -> true, null, @@ -195,8 +194,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10002, + "http://127.0.0.1:10002", messageReceived2, (message, peer) -> true, null, @@ -212,12 +210,13 @@ public class GossipServletTest { try { - gossipServlet.connectTo("127.0.0.1", 10001).join(); - gossipServlet2.connectTo("127.0.0.1", 10002).join(); - gossipServlet.connectTo("127.0.0.1", 10002).join(); + gossipServlet.connectTo("http://127.0.0.1:10001").join(); + gossipServlet2.connectTo("http://127.0.0.1:10002").join(); + gossipServlet.connectTo("http://127.0.0.1:10002").join(); assertEquals(2, peerRepository1.eagerPushPeers().size()); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); gossipServlet.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); @@ -243,8 +242,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10000, + "http://127.0.0.1:10000", messageReceived1, (message, peer) -> true, null, @@ -253,8 +251,7 @@ public class GossipServletTest { 200, 200, bytes -> bytes, - "127.0.0.1", - 10001, + "http://127.0.0.1:10001", messageReceived2, (message, peer) -> true, null, @@ -266,9 +263,10 @@ public class GossipServletTest { server1.start(); server2.start(); - gossipServlet.connectTo("127.0.0.1", 10001).join(); + gossipServlet.connectTo("http://127.0.0.1:10001").join(); assertEquals(1, peerRepository1.eagerPushPeers().size()); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); gossipServlet.send(peerRepository1.peers().iterator().next(), attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); diff --git a/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java b/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java index 4258406c4..f7aa722fa 100644 --- a/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java +++ b/plumtree-servlet/src/main/java/org/apache/tuweni/plumtree/servlet/GossipServlet.java @@ -20,7 +20,7 @@ import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; import org.apache.tuweni.concurrent.CompletableAsyncCompletion; -import org.apache.tuweni.plumtree.MessageHashing; +import org.apache.tuweni.plumtree.MessageIdentity; import org.apache.tuweni.plumtree.MessageListener; import org.apache.tuweni.plumtree.MessageSender; import org.apache.tuweni.plumtree.MessageValidator; @@ -31,10 +31,18 @@ import org.apache.tuweni.plumtree.State; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import jakarta.servlet.ServletConfig; import jakarta.servlet.ServletException; import jakarta.servlet.http.HttpServlet; @@ -50,14 +58,51 @@ public class GossipServlet extends HttpServlet { private static final Logger logger = LoggerFactory.getLogger(GossipServlet.class); private static final ObjectMapper mapper = new ObjectMapper(); - private void sendMessage(MessageSender.Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) { + private final static class BytesSerializer extends StdSerializer<Bytes> { + + public BytesSerializer() { + super(Bytes.class); + } + + @Override + public void serialize(Bytes value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeString(value.toHexString()); + } + } + static class BytesDeserializer extends StdDeserializer<Bytes> { + + BytesDeserializer() { + super(Bytes.class); + } + + @Override + public Bytes deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + String value = p.getValueAsString(); + return Bytes.fromHexStringLenient(value); + } + + } + + static { + SimpleModule module = new SimpleModule(); + module.addSerializer(Bytes.class, new BytesSerializer()); + module.addDeserializer(Bytes.class, new BytesDeserializer()); + mapper.registerModule(module); + } + + private void sendMessage( + MessageSender.Verb verb, + Map<String, Bytes> attributes, + Peer peer, + Bytes hash, + Bytes payload) { Message message = new Message(); message.verb = verb; message.attributes = attributes; message.hash = hash.toHexString(); message.payload = payload == null ? null : payload.toHexString(); - HttpPost postMessage = new HttpPost("http://" + ((ServletPeer) peer).getAddress()); - postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.networkInterface + ":" + this.port); + HttpPost postMessage = new HttpPost(((ServletPeer) peer).getAddress()); + postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.externalURL); try { ByteArrayEntity entity = new ByteArrayEntity(mapper.writeValueAsBytes(message), ContentType.APPLICATION_JSON); postMessage.setEntity(entity); @@ -76,17 +121,17 @@ public class GossipServlet extends HttpServlet { } private static final class Message { + public MessageSender.Verb verb; - public String attributes; + public Map<String, Bytes> attributes; public String hash; public String payload; } private final int graftDelay; private final int lazyQueueInterval; - private final MessageHashing messageHashing; - private final String networkInterface; - private final int port; + private final MessageIdentity messageIdentity; + private final String externalURL; private final MessageListener payloadListener; private final MessageValidator payloadValidator; private final PeerPruning peerPruningFunction; @@ -98,18 +143,16 @@ public class GossipServlet extends HttpServlet { public GossipServlet( int graftDelay, int lazyQueueInterval, - MessageHashing messageHashing, - String networkInterface, - int port, + MessageIdentity messageIdentity, + String externalURL, MessageListener payloadListener, MessageValidator payloadValidator, PeerPruning peerPruningFunction, PeerRepository peerRepository) { this.graftDelay = graftDelay; this.lazyQueueInterval = lazyQueueInterval; - this.messageHashing = messageHashing; - this.networkInterface = networkInterface; - this.port = port; + this.messageIdentity = messageIdentity; + this.externalURL = externalURL; this.payloadListener = payloadListener; this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator; this.peerPruningFunction = peerPruningFunction == null ? (peer) -> true : peerPruningFunction; @@ -125,7 +168,7 @@ public class GossipServlet extends HttpServlet { httpclient = HttpClients.createDefault(); state = new State( peerRepository, - messageHashing, + messageIdentity, this::sendMessage, payloadListener, payloadValidator, @@ -200,7 +243,7 @@ public class GossipServlet extends HttpServlet { * @param attributes the payload to propagate * @param message the payload to propagate */ - public void gossip(String attributes, Bytes message) { + public void gossip(Map<String, Bytes> attributes, Bytes message) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } @@ -214,14 +257,14 @@ public class GossipServlet extends HttpServlet { * @param attributes the payload to propagate * @param message the payload to propagate */ - public void send(Peer peer, String attributes, Bytes message) { + public void send(Peer peer, Map<String, Bytes> attributes, Bytes message) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } state.sendMessage(peer, attributes, message); } - public AsyncCompletion connectTo(String host, int port) { + public AsyncCompletion connectTo(String url) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } @@ -229,22 +272,22 @@ public class GossipServlet extends HttpServlet { CompletableAsyncCompletion completion = AsyncCompletion.incomplete(); AtomicInteger counter = new AtomicInteger(0); - roundConnect(host, port, counter, completion); + roundConnect(url, counter, completion); return completion; } - private void roundConnect(String host, int port, AtomicInteger counter, CompletableAsyncCompletion completion) { - ServletPeer peer = new ServletPeer(host + ":" + port); - HttpPost postMessage = new HttpPost("http://" + peer.getAddress()); - postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.networkInterface + ":" + this.port); + private void roundConnect(String url, AtomicInteger counter, CompletableAsyncCompletion completion) { + ServletPeer peer = new ServletPeer(url); + HttpPost postMessage = new HttpPost(peer.getAddress()); + postMessage.setHeader(PLUMTREE_SERVER_HEADER, this.externalURL); try { httpclient.execute(postMessage, response -> { if (response.getCode() > 299) { if (counter.incrementAndGet() > 5) { completion.completeExceptionally(new RuntimeException(response.getEntity().toString())); } else { - roundConnect(host, port, counter, completion); + roundConnect(url, counter, completion); } } else { state.addPeer(peer); @@ -256,7 +299,7 @@ public class GossipServlet extends HttpServlet { if (counter.incrementAndGet() > 5) { completion.completeExceptionally(e); } else { - roundConnect(host, port, counter, completion); + roundConnect(url, counter, completion); } } } diff --git a/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java index 7f64843af..d16b98fd7 100644 --- a/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java +++ b/plumtree-vertx/src/integrationTest/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -23,6 +23,10 @@ import org.apache.tuweni.plumtree.EphemeralPeerRepository; import org.apache.tuweni.plumtree.MessageListener; import org.apache.tuweni.plumtree.Peer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; + import io.vertx.core.Vertx; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -35,7 +39,7 @@ class VertxGossipServerTest { public Bytes message; @Override - public void listen(Bytes messageBody, String attributes, Peer peer) { + public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer) { message = messageBody; } } @@ -73,7 +77,8 @@ class VertxGossipServerTest { server2.start().join(); server1.connectTo("127.0.0.1", 10001).join(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); server1.gossip(attributes, Bytes.fromHexString("deadbeef")); for (int i = 0; i < 10; i++) { Thread.sleep(500); @@ -134,7 +139,8 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); server3.connectTo("127.0.0.1", 10001).join(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); server1.gossip(attributes, Bytes.fromHexString("deadbeef")); for (int i = 0; i < 10; i++) { Thread.sleep(500); @@ -202,7 +208,8 @@ class VertxGossipServerTest { server2.connectTo("127.0.0.1", 10002).join(); server1.connectTo("127.0.0.1", 10002).join(); assertEquals(2, peerRepository1.eagerPushPeers().size()); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); server1.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); @@ -250,7 +257,8 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); assertEquals(1, peerRepository1.eagerPushPeers().size()); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); server1.send(peerRepository1.peers().iterator().next(), attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.message); diff --git a/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java index fb08b6eb6..316a1657e 100644 --- a/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java +++ b/plumtree-vertx/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -15,7 +15,7 @@ package org.apache.tuweni.plumtree.vertx; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.concurrent.AsyncCompletion; import org.apache.tuweni.concurrent.CompletableAsyncCompletion; -import org.apache.tuweni.plumtree.MessageHashing; +import org.apache.tuweni.plumtree.MessageIdentity; import org.apache.tuweni.plumtree.MessageListener; import org.apache.tuweni.plumtree.MessageSender; import org.apache.tuweni.plumtree.MessageValidator; @@ -25,13 +25,20 @@ import org.apache.tuweni.plumtree.PeerRepository; import org.apache.tuweni.plumtree.State; import java.io.IOException; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; @@ -47,10 +54,42 @@ public final class VertxGossipServer { private static final ObjectMapper mapper = new ObjectMapper(); + private final static class BytesSerializer extends StdSerializer<Bytes> { + + public BytesSerializer() { + super(Bytes.class); + } + + @Override + public void serialize(Bytes value, JsonGenerator gen, SerializerProvider provider) throws IOException { + gen.writeString(value.toHexString()); + } + } + static class BytesDeserializer extends StdDeserializer<Bytes> { + + BytesDeserializer() { + super(Bytes.class); + } + + @Override + public Bytes deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + String value = p.getValueAsString(); + return Bytes.fromHexStringLenient(value); + } + + } + + static { + SimpleModule module = new SimpleModule(); + module.addSerializer(Bytes.class, new BytesSerializer()); + module.addDeserializer(Bytes.class, new BytesDeserializer()); + mapper.registerModule(module); + } + private static final class Message { public MessageSender.Verb verb; - public String attributes; + public Map<String, Bytes> attributes; public String hash; public String payload; } @@ -69,8 +108,7 @@ public final class VertxGossipServer { buffer = Bytes.concatenate(buffer, Bytes.wrapBuffer(data)); while (!buffer.isEmpty()) { Message message; - try { - JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe()); + try (JsonParser parser = mapper.getFactory().createParser(buffer.toArrayUnsafe())) { message = parser.readValueAs(Message.class); buffer = buffer.slice((int) parser.getCurrentLocation().getByteOffset()); } catch (IOException e) { @@ -109,7 +147,7 @@ public final class VertxGossipServer { private NetClient client; private final int graftDelay; private final int lazyQueueInterval; - private final MessageHashing messageHashing; + private final MessageIdentity messageIdentity; private final String networkInterface; private final MessageListener payloadListener; private final MessageValidator payloadValidator; @@ -125,7 +163,7 @@ public final class VertxGossipServer { Vertx vertx, String networkInterface, int port, - MessageHashing messageHashing, + MessageIdentity messageIdentity, PeerRepository peerRepository, MessageListener payloadListener, @Nullable MessageValidator payloadValidator, @@ -135,7 +173,7 @@ public final class VertxGossipServer { this.vertx = vertx; this.networkInterface = networkInterface; this.port = port; - this.messageHashing = messageHashing; + this.messageIdentity = messageIdentity; this.peerRepository = peerRepository; this.payloadListener = payloadListener; this.payloadValidator = payloadValidator == null ? (bytes, peer) -> true : payloadValidator; @@ -159,7 +197,7 @@ public final class VertxGossipServer { if (res.failed()) { completion.completeExceptionally(res.cause()); } else { - state = new State(peerRepository, messageHashing, (verb, attributes, peer, hash, payload) -> { + state = new State(peerRepository, messageIdentity, (verb, attributes, peer, hash, payload) -> { vertx.executeBlocking(future -> { Message message = new Message(); message.verb = verb; @@ -242,7 +280,7 @@ public final class VertxGossipServer { * @param attributes the payload to propagate * @param message the payload to propagate */ - public void gossip(String attributes, Bytes message) { + public void gossip(Map<String, Bytes> attributes, Bytes message) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } @@ -256,7 +294,7 @@ public final class VertxGossipServer { * @param attributes the payload to propagate * @param message the payload to propagate */ - public void send(Peer peer, String attributes, Bytes message) { + public void send(Peer peer, Map<String, Bytes> attributes, Bytes message) { if (!started.get()) { throw new IllegalStateException("Server has not started"); } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java similarity index 81% rename from plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java rename to plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java index 3257c618f..9fd6cbe82 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageHashing.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageIdentity.java @@ -18,8 +18,14 @@ import org.apache.tuweni.bytes.Bytes; * Produces an identifiable footprint for a message (generally a hash) that can be passed on to other peers to identify * uniquely a message being propagated. */ -public interface MessageHashing { +public interface MessageIdentity { - public Bytes hash(Bytes message); + /** + * Generates the identity of the message + * + * @param message the message from which to extract an identity + * @return the identity of the message + */ + Bytes identity(Bytes message); } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java index 9cda77f18..b5640b605 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageListener.java @@ -14,6 +14,8 @@ package org.apache.tuweni.plumtree; import org.apache.tuweni.bytes.Bytes; +import java.util.Map; + /** * Listens to an incoming message, along with its attributes. */ @@ -26,5 +28,5 @@ public interface MessageListener { * @param attributes the attributes of the message * @param peer the peer we received the message from */ - public void listen(Bytes messageBody, String attributes, Peer peer); + public void listen(Bytes messageBody, Map<String, Bytes> attributes, Peer peer); } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java index 8a8658000..6b8645bd4 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/MessageSender.java @@ -14,6 +14,7 @@ package org.apache.tuweni.plumtree; import org.apache.tuweni.bytes.Bytes; +import java.util.Map; import javax.annotation.Nullable; /** @@ -56,6 +57,6 @@ public interface MessageSender { * @param hash the hash of the message * @param payload the bytes to send */ - void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, @Nullable Bytes payload); + void sendMessage(Verb verb, Map<String, Bytes> attributes, Peer peer, Bytes hash, @Nullable Bytes payload); } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java index b6b67d1e6..6b175fbbf 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/State.java @@ -37,7 +37,7 @@ public final class State { private static final int maxMessagesHandlers = 1000000; private final PeerRepository peerRepository; - private final MessageHashing messageHashingFunction; + private final MessageIdentity messageIdentityFunction; private final Map<Bytes, MessageHandler> messageHandlers = Collections.synchronizedMap(new LinkedHashMap<>() { @Override @@ -54,8 +54,8 @@ public final class State { private final Timer timer = new Timer("plumtree", true); private final long delay; - public void sendMessage(Peer peer, String attributes, Bytes message) { - Bytes messageHash = messageHashingFunction.hash(message); + public void sendMessage(Peer peer, Map<String, Bytes> attributes, Bytes message) { + Bytes messageHash = messageIdentityFunction.identity(message); messageSender.sendMessage(MessageSender.Verb.SEND, attributes, peer, messageHash, message); } @@ -78,7 +78,7 @@ public final class State { * @param sender the sender - may be null if we are submitting this message to the network * @param message the payload to send to the network */ - void fullMessageReceived(@Nullable Peer sender, String attributes, Bytes message) { + void fullMessageReceived(@Nullable Peer sender, Map<String, Bytes> attributes, Bytes message) { if (receivedFullMessage.compareAndSet(false, true)) { for (TimerTask task : tasks) { task.cancel(); @@ -145,7 +145,7 @@ public final class State { * Constructor using default time constants. * * @param peerRepository the peer repository to use to store and access peer information. - * @param messageHashingFunction the function to use to hash messages into hashes to compare them. + * @param messageIdentityFunction the function to use to hash messages into hashes to compare them. * @param messageSender a function abstracting sending messages to other peers. * @param messageListener a function consuming messages when they are gossiped. * @param messageValidator a function validating messages before they are gossiped to other peers. @@ -153,14 +153,14 @@ public final class State { */ public State( PeerRepository peerRepository, - MessageHashing messageHashingFunction, + MessageIdentity messageIdentityFunction, MessageSender messageSender, MessageListener messageListener, MessageValidator messageValidator, PeerPruning peerPruningFunction) { this( peerRepository, - messageHashingFunction, + messageIdentityFunction, messageSender, messageListener, messageValidator, @@ -173,7 +173,7 @@ public final class State { * Default constructor. * * @param peerRepository the peer repository to use to store and access peer information. - * @param messageHashingFunction the function to use to hash messages into hashes to compare them. + * @param messageIdentityFunction the function to use to hash messages into hashes to compare them. * @param messageSender a function abstracting sending messages to other peers. * @param messageListener a function consuming messages when they are gossiped. * @param messageValidator a function validating messages before they are gossiped to other peers. @@ -184,7 +184,7 @@ public final class State { */ public State( PeerRepository peerRepository, - MessageHashing messageHashingFunction, + MessageIdentity messageIdentityFunction, MessageSender messageSender, MessageListener messageListener, MessageValidator messageValidator, @@ -192,7 +192,7 @@ public final class State { long graftDelay, long lazyQueueInterval) { this.peerRepository = peerRepository; - this.messageHashingFunction = messageHashingFunction; + this.messageIdentityFunction = messageIdentityFunction; this.messageSender = messageSender; this.messageListener = messageListener; this.messageValidator = messageValidator; @@ -233,8 +233,8 @@ public final class State { * @param message the message * @param messageHash the hash of the message */ - public void receiveGossipMessage(Peer peer, String attributes, Bytes message, Bytes messageHash) { - Bytes checkHash = messageHashingFunction.hash(message); + public void receiveGossipMessage(Peer peer, Map<String, Bytes> attributes, Bytes message, Bytes messageHash) { + Bytes checkHash = messageIdentityFunction.identity(message); if (!checkHash.equals(messageHash)) { return; } @@ -281,8 +281,8 @@ public final class State { * @param attributes of the message * @return The associated hash of the message */ - public Bytes sendGossipMessage(String attributes, Bytes message) { - Bytes messageHash = messageHashingFunction.hash(message); + public Bytes sendGossipMessage(Map<String, Bytes> attributes, Bytes message) { + Bytes messageHash = messageIdentityFunction.identity(message); MessageHandler handler = messageHandlers.computeIfAbsent(messageHash, MessageHandler::new); handler.fullMessageReceived(null, attributes, message); return messageHash; diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java index 2e4af8e67..96750f522 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/StateTest.java @@ -21,6 +21,9 @@ import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.crypto.Hash; import org.apache.tuweni.junit.BouncyCastleExtension; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; import java.util.UUID; import org.jetbrains.annotations.NotNull; @@ -47,7 +50,7 @@ class StateTest { Bytes payload; @Override - public void sendMessage(Verb verb, String attributes, Peer peer, Bytes hash, Bytes payload) { + public void sendMessage(Verb verb, Map<String, Bytes> attributes, Peer peer, Bytes hash, Bytes payload) { this.verb = verb; this.peer = peer; this.hash = hash; @@ -181,7 +184,8 @@ class StateTest { Peer otherPeer = new PeerImpl(); state.addPeer(otherPeer); Bytes32 msg = Bytes32.random(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg)); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); @@ -201,7 +205,8 @@ class StateTest { Peer lazyPeer = new PeerImpl(); state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); state.receiveGossipMessage(peer, attributes, msg, Hash.keccak256(msg)); assertEquals(msg, messageSender.payload); assertEquals(otherPeer, messageSender.peer); @@ -225,7 +230,8 @@ class StateTest { state.addPeer(lazyPeer); repo.moveToLazy(lazyPeer); Bytes message = Bytes32.random(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); state.receiveIHaveMessage(lazyPeer, message); assertNull(messageSender.payload); @@ -279,7 +285,8 @@ class StateTest { Bytes message = Bytes32.random(); state.receiveIHaveMessage(lazyPeer, Hash.keccak256(message)); Thread.sleep(100); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); Thread.sleep(500); assertNull(messageSender.verb); @@ -295,7 +302,8 @@ class StateTest { new State(repo, Hash::keccak256, messageSender, messageListener, (message, peer) -> true, (peer) -> true); Peer peer = new PeerImpl(); Bytes message = Bytes32.random(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); assertEquals(1, repo.eagerPushPeers().size()); assertEquals(0, repo.lazyPushPeers().size()); @@ -311,7 +319,8 @@ class StateTest { Peer peer = new PeerImpl(); Peer secondPeer = new PeerImpl(); Bytes message = Bytes32.random(); - String attributes = "{\"message_type\": \"BLOCK\"}"; + Map<String, Bytes> attributes = + Collections.singletonMap("message_type", Bytes.wrap("BLOCK".getBytes(StandardCharsets.UTF_8))); state.receiveGossipMessage(peer, attributes, message, Hash.keccak256(message)); state.receiveGossipMessage(secondPeer, attributes, message, Hash.keccak256(message)); assertEquals(2, repo.eagerPushPeers().size()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org