This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new 313084b Make status sync recursive, and fix issues found new 3acba73 Merge pull request #395 from atoulme/fix_sync 313084b is described below commit 313084b47f55db8a901bc0b3427125b95a31b54d Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Sat Mar 26 00:45:24 2022 -0700 Make status sync recursive, and fix issues found --- .../org/apache/tuweni/devp2p/eth/EthController.kt | 2 +- .../tuweni/ethclient/EthereumClientConfig.kt | 6 ++-- .../tuweni/ethclient/PeerStatusEthSynchronizer.kt | 34 ++++++++++++++++++++-- .../org/apache/tuweni/ethclient/Synchronizer.kt | 3 ++ 4 files changed, 40 insertions(+), 5 deletions(-) diff --git a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt index bc6c6a4..97eb021 100644 --- a/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt +++ b/devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt @@ -129,7 +129,7 @@ class EthController( return } val hashes = request.data as List<*> - for (i in 0..hashes.size) { + for (i in 0 until hashes.size) { repository.storeBlockBody(hashes[i] as Hash, bodies[i]) } request.handle.complete(bodies) diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt index 83833ea..7be915c 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt @@ -193,7 +193,7 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp return emptyList() } return synchronizers.map { section -> - val sectionConfig = config.getConfigurationSection("proxy.$section") + val sectionConfig = config.getConfigurationSection("synchronizer.$section") SynchronizerConfigurationImpl( section, SynchronizerType.valueOf(sectionConfig.getString("type")), UInt256.valueOf(sectionConfig.getLong("from")), @@ -474,7 +474,9 @@ internal data class GenesisFileConfigurationImpl(private val name: String, priva override fun genesisFile(): GenesisFile = GenesisFile.read( if (genesisFilePath.scheme == "classpath") { - GenesisFileConfigurationImpl::class.java.getResource(genesisFilePath.path).readBytes() + val resource = GenesisFileConfigurationImpl::class.java.getResource(genesisFilePath.path) + ?: throw IllegalArgumentException("No such classpath resource: ${genesisFilePath.path}") + resource.readBytes() } else { Files.readAllBytes(Path.of(genesisFilePath)) } diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt index 0752b13..19f72a1 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt @@ -19,6 +19,7 @@ package org.apache.tuweni.ethclient import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.launch import org.apache.tuweni.devp2p.eth.EthRequestsManager +import org.apache.tuweni.eth.BlockHeader import org.apache.tuweni.eth.Hash import org.apache.tuweni.eth.repository.BlockchainRepository import org.apache.tuweni.units.bigints.UInt256 @@ -41,7 +42,7 @@ class PeerStatusEthSynchronizer( client: EthRequestsManager, peerRepository: EthereumPeerRepository, private val adapter: WireConnectionPeerRepositoryAdapter, - from: UInt256?, + from: UInt256? = UInt256.ZERO, to: UInt256? ) : Synchronizer(executor, coroutineContext, repository, client, peerRepository, from, to) { @@ -70,7 +71,36 @@ class PeerStatusEthSynchronizer( 0L, true, adapter.get(ethereumConnection) - ).thenAccept(::addHeaders) + ).thenAccept { + addHeaders(it) + launch { + repeatAskingForHeaders(ethereumConnection, it.last()) + } + } + } + } + } + + private suspend fun repeatAskingForHeaders(ethereumConnection: EthereumConnection, header: BlockHeader) { + if (from?.greaterOrEqualThan(header.number) == true) { + logger.info("Status synchronizer done with ${ethereumConnection.identity()}") + return + } + if (repository.hasBlockHeader(header.hash)) { + logger.info("Status synchronizer hitting known header ${header.number}, stop there.") + return + } + logger.info("Requesting headers for ${ethereumConnection.identity()} at block ${header.number}") + client.requestBlockHeaders( + Hash.fromBytes(header.hash), + HEADER_REQUEST_SIZE, + 0L, + true, + adapter.get(ethereumConnection) + ).thenAccept { + addHeaders(it) + launch { + repeatAskingForHeaders(ethereumConnection, it.last()) } } } diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt index 2cac6a3..85a47e9 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt @@ -58,7 +58,10 @@ abstract class Synchronizer( } }.awaitAll() if (!bodiesToRequest.isEmpty()) { + logger.info("Requesting ${bodiesToRequest.size} block bodies") client.requestBlockBodies(bodiesToRequest) + } else { + logger.info("No bodies requested") } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org