This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new 7f23059 Make synchronizers configurable new fa88db3 Merge pull request #369 from atoulme/synchronizer_config 7f23059 is described below commit 7f23059aa20df0edc4f4a1ea811198fe8296ebab Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Mon Jan 24 22:19:53 2022 -0800 Make synchronizers configurable --- .../apache/tuweni/config/PropertyValidator.java | 15 ++++ .../tuweni/config/PropertyValidatorTest.java | 8 ++ .../org/apache/tuweni/ethclient/EthereumClient.kt | 75 +++++++++++-------- .../tuweni/ethclient/EthereumClientConfig.kt | 85 ++++++++++++++++++++-- ...nizer.kt => FromBestBlockHeaderSynchronizer.kt} | 10 ++- .../ethclient/FromUnknownParentSynchronizer.kt | 10 ++- .../tuweni/ethclient/PeerStatusEthSynchronizer.kt | 7 +- .../org/apache/tuweni/ethclient/Synchronizer.kt | 5 +- 8 files changed, 172 insertions(+), 43 deletions(-) diff --git a/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java b/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java index c59c01d..7cbc3d3 100644 --- a/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java +++ b/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java @@ -103,6 +103,21 @@ public interface PropertyValidator<T> { } /** + * A validator that ensures a property, if present, is equal or greater than the value. + * + * @param value The lower bound (inclusive). + * @return A validator that ensures a property, if present, is equal or greater than the value. + */ + static PropertyValidator<Number> isGreaterOrEqual(long value) { + return (key, position, currentValue) -> { + if (currentValue != null && (currentValue.longValue() < value)) { + return singleError(position, "Value of property '" + key + "' is less than '" + value + "'"); + } + return noErrors(); + }; + } + + /** * A validator that ensures a property, if present, has a value within a given set. * * @param values The acceptable values. diff --git a/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java b/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java index 17e44cf..01a63e0 100644 --- a/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java +++ b/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java @@ -81,4 +81,12 @@ class PropertyValidatorTest { assertEquals(1, errors.size()); assertEquals("Value of property 'foo' should be \"one\", \"two\", or \"three \"", errors.get(0).getMessage()); } + + @Test + void validatesEqualOrGreater() { + PropertyValidator<Number> longPropertyValidator = PropertyValidator.isGreaterOrEqual(32L); + assertTrue(longPropertyValidator.validate("foo", null, 33L).isEmpty()); + assertTrue(longPropertyValidator.validate("foo", null, 32L).isEmpty()); + assertEquals(1, longPropertyValidator.validate("foo", null, 31L).size()); + } } diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt index 9f1a958..6bd6b93 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt @@ -74,13 +74,13 @@ class EthereumClient( } private var metricsService: MetricsService? = null - private val genesisFiles = HashMap<String, GenesisFile>() - private val services = HashMap<String, RLPxService>() - private val storageRepositories = HashMap<String, BlockchainRepository>() - val peerRepositories = HashMap<String, EthereumPeerRepository>() - private val dnsClients = HashMap<String, DNSClient>() - private val discoveryServices = HashMap<String, DiscoveryService>() - private val synchronizers = HashMap<String, Synchronizer>() + private val genesisFiles = mutableMapOf<String, GenesisFile>() + private val services = mutableMapOf<String, RLPxService>() + private val storageRepositories = mutableMapOf<String, BlockchainRepository>() + val peerRepositories = mutableMapOf<String, EthereumPeerRepository>() + private val dnsClients = mutableMapOf<String, DNSClient>() + private val discoveryServices = mutableMapOf<String, DiscoveryService>() + private val synchronizers = mutableMapOf<String, Synchronizer>() private val managerHandler = mutableListOf<DefaultCacheManager>() @@ -272,28 +272,45 @@ class EthereumClient( } } - val synchronizer = PeerStatusEthSynchronizer( - repository = repository, - client = service.getClient(ETH66) as EthRequestsManager, - peerRepository = peerRepository, - adapter = adapter - ) - synchronizers[rlpxConfig.getName() + "status"] = synchronizer - synchronizer.start() - val parentSynchronizer = FromUnknownParentSynchronizer( - repository = repository, - client = service.getClient(ETH66) as EthRequestsManager, - peerRepository = peerRepository - ) - synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer - parentSynchronizer.start() - val bestSynchronizer = FromBestBlockSynchronizer( - repository = repository, - client = service.getClient(ETH66) as EthRequestsManager, - peerRepository = peerRepository - ) - synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer - bestSynchronizer.start() + for (sync in config.synchronizers()) { + when (sync.getType()) { + SynchronizerType.best -> { + val bestSynchronizer = FromBestBlockHeaderSynchronizer( + repository = repository, + client = service.getClient(ETH66) as EthRequestsManager, + peerRepository = peerRepository, + from = sync.getFrom(), + to = sync.getTo(), + ) + bestSynchronizer.start() + synchronizers[sync.getName()] = bestSynchronizer + } + SynchronizerType.status -> { + val synchronizer = PeerStatusEthSynchronizer( + repository = repository, + client = service.getClient(ETH66) as EthRequestsManager, + peerRepository = peerRepository, + adapter = adapter, + from = sync.getFrom(), + to = sync.getTo(), + ) + synchronizer.start() + synchronizers[sync.getName()] = synchronizer + } + SynchronizerType.parent -> { + val parentSynchronizer = FromUnknownParentSynchronizer( + repository = repository, + client = service.getClient(ETH66) as EthRequestsManager, + peerRepository = peerRepository, + from = sync.getFrom(), + to = sync.getTo(), + ) + parentSynchronizer.start() + synchronizers[sync.getName()] = parentSynchronizer + } + } + } + logger.info("Finished configuring Ethereum client ${rlpxConfig.getName()}") } } diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt index 64c2d76..83833ea 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt @@ -26,6 +26,7 @@ import org.apache.tuweni.config.SchemaBuilder import org.apache.tuweni.crypto.SECP256K1 import org.apache.tuweni.devp2p.parseEnodeUri import org.apache.tuweni.eth.genesis.GenesisFile +import org.apache.tuweni.units.bigints.UInt256 import org.slf4j.LoggerFactory import java.io.FileNotFoundException import java.net.URI @@ -186,6 +187,21 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp } } + fun synchronizers(): List<SynchronizerConfiguration> { + val synchronizers = config.sections("synchronizer") + if (synchronizers == null || synchronizers.isEmpty()) { + return emptyList() + } + return synchronizers.map { section -> + val sectionConfig = config.getConfigurationSection("proxy.$section") + SynchronizerConfigurationImpl( + section, SynchronizerType.valueOf(sectionConfig.getString("type")), + UInt256.valueOf(sectionConfig.getLong("from")), + UInt256.valueOf(sectionConfig.getLong("to")) + ) + } + } + fun validate(): Stream<ConfigurationError> { val schema = createSchema() var errors = schema.validate(this.config) @@ -205,7 +221,10 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp private fun validateSubsection(name: String, schema: Schema): Stream<ConfigurationError> { var errors = listOf<ConfigurationError>().stream() for (subSection in this.config.sections(name)) { - errors = Stream.concat(errors, schema.getSubSection(name).validate(this.config.getConfigurationSection("$name.$subSection"))) + errors = Stream.concat( + errors, + schema.getSubSection(name).validate(this.config.getConfigurationSection("$name.$subSection")) + ) } return errors } @@ -250,7 +269,12 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp val discoverySection = SchemaBuilder.create() discoverySection.addString("identity", "", "Node identity", null) discoverySection.addString("networkInterface", "127.0.0.1", "Network interface to bind", null) - discoverySection.addInteger("port", 0, "Port to expose the discovery service on", PropertyValidator.isValidPortOrZero()) + discoverySection.addInteger( + "port", + 0, + "Port to expose the discovery service on", + PropertyValidator.isValidPortOrZero() + ) discoverySection.addString("peerRepository", "default", "Peer repository to which records should go", null) val genesis = SchemaBuilder.create() @@ -276,7 +300,12 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp } ) rlpx.addInteger("port", 0, "Port to expose the RLPx service on", PropertyValidator.isValidPortOrZero()) - rlpx.addInteger("advertisedPort", 30303, "Port to advertise in communications as the RLPx service port", PropertyValidator.isValidPort()) + rlpx.addInteger( + "advertisedPort", + 30303, + "Port to advertise in communications as the RLPx service port", + PropertyValidator.isValidPort() + ) rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum client", null) rlpx.addString("repository", "default", "Name of the blockchain repository", null) rlpx.addString("peerRepository", "default", "Peer repository to which records should go", null) @@ -287,6 +316,17 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp val peerRepositoriesSection = SchemaBuilder.create() peerRepositoriesSection.addString("type", "memory", "Peer repository type", PropertyValidator.anyOf("memory")) + + val synchronizersSection = SchemaBuilder.create() + synchronizersSection.addString( + "type", + "status", + "Synchronizer type", + PropertyValidator.anyOf("status", "parent", "best") + ) + synchronizersSection.addLong("from", 0L, "Start block to sync from", PropertyValidator.isGreaterOrEqual(0L)) + synchronizersSection.addLong("to", 0L, "End block to sync to", PropertyValidator.isGreaterOrEqual(0L)) + val builder = SchemaBuilder.create() builder.addSection("metrics", metricsSection.toSchema()) builder.addSection("storage", storageSection.toSchema()) @@ -297,6 +337,7 @@ class EthereumClientConfig(private var config: Configuration = Configuration.emp builder.addSection("genesis", genesis.toSchema()) builder.addSection("proxy", proxiesSection.toSchema()) builder.addSection("peerRepository", peerRepositoriesSection.toSchema()) + builder.addSection("synchronizer", synchronizersSection.toSchema()) return builder.toSchema() } @@ -379,7 +420,19 @@ interface PeerRepositoryConfiguration { fun getType(): String } -internal class PeerRepositoryConfigurationImpl(private val repoName: String, private val type: String) : PeerRepositoryConfiguration { +enum class SynchronizerType { + best, status, parent +} + +interface SynchronizerConfiguration { + fun getName(): String + fun getType(): SynchronizerType + fun getFrom(): UInt256? + fun getTo(): UInt256? +} + +internal class PeerRepositoryConfigurationImpl(private val repoName: String, private val type: String) : + PeerRepositoryConfiguration { override fun getName(): String = repoName override fun getType(): String = type } @@ -414,7 +467,8 @@ internal data class RLPxServiceConfigurationImpl( override fun peerRepository(): String = peerRepository } -internal data class GenesisFileConfigurationImpl(private val name: String, private val genesisFilePath: URI) : GenesisFileConfiguration { +internal data class GenesisFileConfigurationImpl(private val name: String, private val genesisFilePath: URI) : + GenesisFileConfiguration { override fun getName(): String = name override fun genesisFile(): GenesisFile = @@ -470,16 +524,33 @@ data class DiscoveryConfigurationImpl( override fun getPort() = port } -data class StaticPeersConfigurationImpl(private val enodes: List<String>, private val peerRepository: String) : StaticPeersConfiguration { +data class StaticPeersConfigurationImpl(private val enodes: List<String>, private val peerRepository: String) : + StaticPeersConfiguration { override fun enodes(): List<String> = enodes override fun peerRepository() = peerRepository } -data class ProxyConfigurationImpl(private val name: String, private val upstream: String, private val downstream: String) : ProxyConfiguration { +data class ProxyConfigurationImpl( + private val name: String, + private val upstream: String, + private val downstream: String, +) : ProxyConfiguration { override fun name() = name override fun upstream() = upstream override fun downstream() = downstream } + +data class SynchronizerConfigurationImpl( + private val name: String, + private val type: SynchronizerType, + private val from: UInt256?, + private val to: UInt256?, +) : SynchronizerConfiguration { + override fun getName() = name + override fun getType() = type + override fun getFrom(): UInt256? = from + override fun getTo(): UInt256? = to +} diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt similarity index 91% rename from eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt rename to eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt index 4edb80a..d70a3bc 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.launch import org.apache.tuweni.devp2p.eth.EthRequestsManager import org.apache.tuweni.eth.BlockHeader import org.apache.tuweni.eth.repository.BlockchainRepository +import org.apache.tuweni.units.bigints.UInt256 import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import kotlin.coroutines.CoroutineContext @@ -29,13 +30,15 @@ import kotlin.coroutines.CoroutineContext const val BEST_PEER_DELAY: Long = 5000 const val HEADERS_RESPONSE_TIMEOUT: Long = 10000 -class FromBestBlockSynchronizer( +class FromBestBlockHeaderSynchronizer( executor: ExecutorService = Executors.newSingleThreadExecutor(), coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(), repository: BlockchainRepository, client: EthRequestsManager, peerRepository: EthereumPeerRepository, -) : Synchronizer(executor, coroutineContext, repository, client, peerRepository) { + from: UInt256?, + to: UInt256?, +) : Synchronizer(executor, coroutineContext, repository, client, peerRepository, from, to) { override fun start() { launch { @@ -51,6 +54,9 @@ class FromBestBlockSynchronizer( } private fun askNextBestHeaders(header: BlockHeader) { + if ((null != from && header.number < from) || (null != to && header.number > to)) { + return + } launch { if (peerRepository.activeConnections().count() == 0L) { askNextBestHeaders(header) diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt index 2e9b9c4..6222608 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.launch import org.apache.tuweni.devp2p.eth.EthRequestsManager import org.apache.tuweni.eth.BlockHeader import org.apache.tuweni.eth.repository.BlockchainRepository +import org.apache.tuweni.units.bigints.UInt256 import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import kotlin.coroutines.CoroutineContext @@ -34,8 +35,10 @@ class FromUnknownParentSynchronizer( coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(), repository: BlockchainRepository, client: EthRequestsManager, - peerRepository: EthereumPeerRepository -) : Synchronizer(executor, coroutineContext, repository, client, peerRepository) { + peerRepository: EthereumPeerRepository, + from: UInt256?, + to: UInt256? +) : Synchronizer(executor, coroutineContext, repository, client, peerRepository, from, to) { var listenerId: String? = null @@ -54,6 +57,9 @@ class FromUnknownParentSynchronizer( if (header.number.isZero) { return } + if ((null != from && header.number < from) || (null != to && header.number > to)) { + return + } val parentHash = header.parentHash ?: return launch { delay(DELAY) diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt index 1df2b15..0752b13 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt @@ -21,6 +21,7 @@ import kotlinx.coroutines.launch import org.apache.tuweni.devp2p.eth.EthRequestsManager import org.apache.tuweni.eth.Hash import org.apache.tuweni.eth.repository.BlockchainRepository +import org.apache.tuweni.units.bigints.UInt256 import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import kotlin.coroutines.CoroutineContext @@ -39,8 +40,10 @@ class PeerStatusEthSynchronizer( repository: BlockchainRepository, client: EthRequestsManager, peerRepository: EthereumPeerRepository, - private val adapter: WireConnectionPeerRepositoryAdapter -) : Synchronizer(executor, coroutineContext, repository, client, peerRepository) { + private val adapter: WireConnectionPeerRepositoryAdapter, + from: UInt256?, + to: UInt256? +) : Synchronizer(executor, coroutineContext, repository, client, peerRepository, from, to) { var listenerId: String? = null diff --git a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt index e0db77d..2cac6a3 100644 --- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt +++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt @@ -25,6 +25,7 @@ import org.apache.tuweni.devp2p.eth.EthRequestsManager import org.apache.tuweni.eth.BlockHeader import org.apache.tuweni.eth.Hash import org.apache.tuweni.eth.repository.BlockchainRepository +import org.apache.tuweni.units.bigints.UInt256 import org.slf4j.LoggerFactory import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -37,7 +38,9 @@ abstract class Synchronizer( override val coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(), val repository: BlockchainRepository, val client: EthRequestsManager, - val peerRepository: EthereumPeerRepository + val peerRepository: EthereumPeerRepository, + val from: UInt256?, + val to: UInt256? ) : CoroutineScope { abstract fun start() abstract fun stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org