This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new 09abe88 Fix a series of issues blocking 2 clients from talking to each other new ea2f814 Merge pull request #365 from atoulme/connect_two_ethereum_clients 09abe88 is described below commit 09abe88d5fe8343f780ea03459f4f1cb848aaac5 Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Mon Jan 17 23:38:20 2022 -0800 Fix a series of issues blocking 2 clients from talking to each other --- .../org/apache/tuweni/devp2p/eth/EthHandler.kt | 4 ++++ .../org/apache/tuweni/devp2p/eth/EthHandler66.kt | 6 +++++- .../org/apache/tuweni/devp2p/eth/EthHandlerTest.kt | 13 +++++++++++ .../tuweni/ethclient/EthereumClientRunTest.kt | 25 +++++++++++++--------- .../org/apache/tuweni/ethclient/EthereumClient.kt | 14 ++++++------ .../tuweni/ethclient/EthereumPeerRepository.kt | 9 ++++---- .../org/apache/tuweni/ethclient/Synchronizer.kt | 4 +++- .../WireConnectionPeerRepositoryAdapter.kt | 5 +++-- .../java/org/apache/tuweni/rlp/BytesRLPReader.java | 2 +- .../org/apache/tuweni/rlp/BytesRLPReaderTest.java | 4 ++-- .../tuweni/rlpx/wire/DefaultWireConnection.java | 10 ++++----- 11 files changed, 63 insertions(+), 33 deletions(-) diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt index 2169599..9290232 100644 --- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt +++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt @@ -197,6 +197,10 @@ internal class EthHandler( } private suspend fun handleGetBlockBodies(connection: WireConnection, message: GetBlockBodies) { + if (message.hashes.isEmpty()) { + service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON) + return + } service.send( connection.agreedSubprotocolVersion(ETH62.name()), MessageType.BlockBodies.code, diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt index 05bfcef..c45ad6b 100644 --- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt +++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler66.kt @@ -56,7 +56,7 @@ internal class EthHandler66( } return asyncCompletion { logger.debug("Receiving message of type {}", messageType) - val pair = RLP.decode(message) { + val pair = RLP.decodeList(message) { Pair(it.readValue(), it.readRemaining()) } val requestIdentifier = pair.first @@ -232,6 +232,10 @@ internal class EthHandler66( } private suspend fun handleGetBlockBodies(connection: WireConnection, requestIdentifier: Bytes, message: GetBlockBodies) { + if (message.hashes.isEmpty()) { + service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON) + return + } val bodies = BlockBodies(controller.findBlockBodies(message.hashes)) service.send( EthSubprotocol.ETH66, diff --git a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt index 3351244..adfac50 100644 --- a/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt +++ b/devp2p-eth/src/test/kotlin/org/apache/tuweni/devp2p/eth/EthHandlerTest.kt @@ -42,6 +42,7 @@ import org.apache.tuweni.junit.VertxExtension import org.apache.tuweni.kv.MapKeyValueStore import org.apache.tuweni.rlp.RLP import org.apache.tuweni.rlpx.RLPxService +import org.apache.tuweni.rlpx.wire.DisconnectReason import org.apache.tuweni.rlpx.wire.WireConnection import org.apache.tuweni.units.bigints.UInt256 import org.apache.tuweni.units.bigints.UInt64 @@ -278,4 +279,16 @@ class EthHandlerTest { assertEquals(1, messageRead.transactionReceipts.size) assertEquals(txReceipt, messageRead.transactionReceipts[0][0]) } + + @Test + fun testGetBodiesEmpty(): Unit = runBlocking { + val conn = mock(WireConnection::class.java) + handler.handle( + conn, + MessageType.GetBlockBodies.code, + GetBlockBodies(listOf()).toBytes() + ).await() + + verify(service).disconnect(conn, DisconnectReason.SUBPROTOCOL_REASON) + } } diff --git a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt index 3c35046..8998f36 100644 --- a/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt +++ b/eth-client/src/integrationTest/kotlin/org/apache/tuweni/ethclient/EthereumClientRunTest.kt @@ -19,10 +19,14 @@ package org.apache.tuweni.ethclient import io.vertx.core.Vertx import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import org.apache.tuweni.concurrent.AsyncResult +import org.apache.tuweni.concurrent.coroutines.await import org.apache.tuweni.crypto.SECP256K1 import org.apache.tuweni.junit.BouncyCastleExtension import org.apache.tuweni.junit.VertxExtension import org.apache.tuweni.junit.VertxInstance +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith @@ -37,16 +41,11 @@ class EthereumClientRunTest { """ [peerRepository.default] type="memory" - [metrics] - networkInterface="127.0.0.1" - port=9091 [storage.default] path="data" genesis="default" [genesis.default] path="classpath:/default.json" - [static.default] - peerRepository="default" [rlpx.default] networkInterface="127.0.0.1" port=30301 @@ -57,9 +56,6 @@ class EthereumClientRunTest { """ [peerRepository.default] type="memory" - [metrics] - networkInterface="127.0.0.1" - port=9092 [storage.default] path="data2" genesis="default" @@ -68,14 +64,23 @@ class EthereumClientRunTest { [static.default] enodes=["enode://${keyPair.publicKey().toHexString()}@127.0.0.1:30301"] peerRepository="default" + [rlpx.default] + networkInterface="127.0.0.1" + port=30304 + key="${SECP256K1.KeyPair.random().secretKey().bytes().toHexString()}" """.trimMargin() ) val client1 = EthereumClient(vertx, config1) val client2 = EthereumClient(vertx, config2) client1.start() + val connectionInfo = AsyncResult.incomplete<EthereumConnection>() + client1.peerRepositories["default"]!!.addStatusListener { connectionInfo.complete(it) } client2.start() - delay(1000) - // TODO make sure the connection happens! + + val conn = connectionInfo.await() + assertNotNull(conn) + assertNotNull(conn.status()) + assertEquals(client2.config.rlpxServices()[0].keyPair().publicKey(), conn.peer().id().publicKey()) client1.stop() client2.stop() } diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt index 8544dcb..9f1a958 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt @@ -244,6 +244,12 @@ class EthereumClient( adapter ) services[rlpxConfig.getName()] = service + peerRepository.addIdentityListener { + service.connectTo( + it.publicKey(), + InetSocketAddress(it.networkInterface(), it.port()) + ) + } service.start().thenRun { logger.info("Started Ethereum client ${rlpxConfig.getName()}") val proxyClient = service.getClient(ProxySubprotocol.ID) as ProxyClient @@ -265,12 +271,7 @@ class EthereumClient( } } } - peerRepository.addIdentityListener { - service.connectTo( - it.publicKey(), - InetSocketAddress(it.networkInterface(), it.port()) - ) - } + val synchronizer = PeerStatusEthSynchronizer( repository = repository, client = service.getClient(ETH66) as EthRequestsManager, @@ -293,6 +294,7 @@ class EthereumClient( ) synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer bestSynchronizer.start() + logger.info("Finished configuring Ethereum client ${rlpxConfig.getName()}") } } ).await() diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt index edca79b..f680ec8 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumPeerRepository.kt @@ -33,10 +33,10 @@ import java.util.stream.Stream interface EthereumPeerRepository : PeerRepository { /** * Stores the status message sent for a connection - * @param connId the ID of the connection + * @param peerIdentity the peer identity * @param status the status message */ - fun storeStatus(connId: String, status: Status) + fun storeStatus(peerIdentity: Identity, status: Status) /** * Provides a stream of active connections. @@ -87,8 +87,9 @@ class MemoryEthereumPeerRepository : EthereumPeerRepository { val statusListeners = HashMap<String, (EthereumConnection) -> Unit>() val identityListeners = HashMap<String, (Identity) -> Unit>() - override fun storeStatus(connId: String, status: Status) { - connections[connId]?.let { conn -> + override fun storeStatus(peerIdentity: Identity, status: Status) { + val connKey = peerMap[peerIdentity]?.let { createConnectionKey(it, peerIdentity) } + connections[connKey]?.let { conn -> (conn as MemoryEthereumConnection).status = status statusListeners.values.forEach { it(conn) diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt index 3254e16..e0db77d 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt @@ -54,7 +54,9 @@ abstract class Synchronizer( } } }.awaitAll() - client.requestBlockBodies(bodiesToRequest) + if (!bodiesToRequest.isEmpty()) { + client.requestBlockBodies(bodiesToRequest) + } } } } diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt index c50c902..ccb0112 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/WireConnectionPeerRepositoryAdapter.kt @@ -17,6 +17,7 @@ package org.apache.tuweni.ethclient import org.apache.tuweni.devp2p.eth.Status +import org.apache.tuweni.peer.repository.Identity import org.apache.tuweni.rlpx.WireConnectionRepository import org.apache.tuweni.rlpx.wire.SubProtocolIdentifier import org.apache.tuweni.rlpx.wire.WireConnection @@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap */ class WireConnectionPeerRepositoryAdapter(val peerRepository: EthereumPeerRepository) : WireConnectionRepository { - private val wireConnectionToIdentities = ConcurrentHashMap<String, String>() + private val wireConnectionToIdentities = ConcurrentHashMap<String, Identity>() private val connections = ConcurrentHashMap<String, WireConnection>() private val connectionListeners = ArrayList<WireConnectionRepository.Listener>() @@ -49,7 +50,7 @@ class WireConnectionPeerRepositoryAdapter(val peerRepository: EthereumPeerReposi val peer = peerRepository.storePeer(id, Instant.now(), Instant.now()) peerRepository.addConnection(peer, id) connections[id.id()] = wireConnection - wireConnectionToIdentities[wireConnection.uri()] = id.id() + wireConnectionToIdentities[wireConnection.uri()] = peer.id() wireConnection.registerListener { if (it == WireConnection.Event.CONNECTED) { for (listener in connectionListeners) { diff --git a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java index 8a4bec0..415df60 100644 --- a/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java +++ b/rlp/src/main/java/org/apache/tuweni/rlp/BytesRLPReader.java @@ -38,7 +38,7 @@ final class BytesRLPReader implements RLPReader { if (remaining == 0) { return Bytes.EMPTY; } - return content.slice(remaining); + return content.slice(index++, remaining); } @Override diff --git a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java index 6b0d245..b16f9e5 100644 --- a/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java +++ b/rlp/src/test/java/org/apache/tuweni/rlp/BytesRLPReaderTest.java @@ -222,11 +222,11 @@ class BytesRLPReaderTest { @Test void shouldReadRemaining() { - Bytes input = Bytes.fromHexString("83646f6783646f67"); + Bytes input = Bytes.fromHexString("83646f6783646f6783646f67"); RLP.decode(input, reader -> { reader.readValue(); assertEquals(4, reader.position()); - assertEquals(Bytes.fromHexString("83646f67"), reader.readRemaining()); + assertEquals(Bytes.fromHexString("83646f6783646f67"), reader.readRemaining()); return null; }); } diff --git a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java index ebae524..73e1f8b 100644 --- a/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java +++ b/rlpx/src/main/java/org/apache/tuweni/rlpx/wire/DefaultWireConnection.java @@ -205,11 +205,9 @@ public final class DefaultWireConnection implements WireConnection { int offset = subProtocolEntry.getKey().lowerEndpoint(); logger.trace("Received message of type {}", message.messageId() - offset); SubProtocolHandler handler = subprotocols.get(subProtocolEntry.getValue()); - try { - handler.handle(this, message.messageId() - offset, message.content()); - } catch (Throwable t) { - logger.error("Handler " + handler.toString() + " threw an exception", t); - } + handler + .handle(this, message.messageId() - offset, message.content()) + .exceptionally(t -> logger.error("Handler " + handler.toString() + " threw an exception", t)); } } } @@ -263,7 +261,7 @@ public final class DefaultWireConnection implements WireConnection { /** * Sends a ping message to the remote peer. - * + * * @return a handler marking completion when a pong response is received */ public AsyncCompletion sendPing() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org