This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f23059  Make synchronizers configurable
     new fa88db3  Merge pull request #369 from atoulme/synchronizer_config
7f23059 is described below

commit 7f23059aa20df0edc4f4a1ea811198fe8296ebab
Author: Antoine Toulme <anto...@lunar-ocean.com>
AuthorDate: Mon Jan 24 22:19:53 2022 -0800

    Make synchronizers configurable
---
 .../apache/tuweni/config/PropertyValidator.java    | 15 ++++
 .../tuweni/config/PropertyValidatorTest.java       |  8 ++
 .../org/apache/tuweni/ethclient/EthereumClient.kt  | 75 +++++++++++--------
 .../tuweni/ethclient/EthereumClientConfig.kt       | 85 ++++++++++++++++++++--
 ...nizer.kt => FromBestBlockHeaderSynchronizer.kt} | 10 ++-
 .../ethclient/FromUnknownParentSynchronizer.kt     | 10 ++-
 .../tuweni/ethclient/PeerStatusEthSynchronizer.kt  |  7 +-
 .../org/apache/tuweni/ethclient/Synchronizer.kt    |  5 +-
 8 files changed, 172 insertions(+), 43 deletions(-)

diff --git 
a/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java 
b/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
index c59c01d..7cbc3d3 100644
--- a/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
+++ b/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
@@ -103,6 +103,21 @@ public interface PropertyValidator<T> {
   }
 
   /**
+   * A validator that ensures a property, if present, is equal or greater than 
the value.
+   *
+   * @param value The lower bound (inclusive).
+   * @return A validator that ensures a property, if present, is equal or 
greater than the value.
+   */
+  static PropertyValidator<Number> isGreaterOrEqual(long value) {
+    return (key, position, currentValue) -> {
+      if (currentValue != null && (currentValue.longValue() < value)) {
+        return singleError(position, "Value of property '" + key + "' is less 
than '" + value + "'");
+      }
+      return noErrors();
+    };
+  }
+
+  /**
    * A validator that ensures a property, if present, has a value within a 
given set.
    *
    * @param values The acceptable values.
diff --git 
a/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java 
b/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
index 17e44cf..01a63e0 100644
--- a/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
+++ b/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
@@ -81,4 +81,12 @@ class PropertyValidatorTest {
     assertEquals(1, errors.size());
     assertEquals("Value of property 'foo' should be \"one\", \"two\", or 
\"three \"", errors.get(0).getMessage());
   }
+
+  @Test
+  void validatesEqualOrGreater() {
+    PropertyValidator<Number> longPropertyValidator = 
PropertyValidator.isGreaterOrEqual(32L);
+    assertTrue(longPropertyValidator.validate("foo", null, 33L).isEmpty());
+    assertTrue(longPropertyValidator.validate("foo", null, 32L).isEmpty());
+    assertEquals(1, longPropertyValidator.validate("foo", null, 31L).size());
+  }
 }
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 9f1a958..6bd6b93 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -74,13 +74,13 @@ class EthereumClient(
   }
 
   private var metricsService: MetricsService? = null
-  private val genesisFiles = HashMap<String, GenesisFile>()
-  private val services = HashMap<String, RLPxService>()
-  private val storageRepositories = HashMap<String, BlockchainRepository>()
-  val peerRepositories = HashMap<String, EthereumPeerRepository>()
-  private val dnsClients = HashMap<String, DNSClient>()
-  private val discoveryServices = HashMap<String, DiscoveryService>()
-  private val synchronizers = HashMap<String, Synchronizer>()
+  private val genesisFiles = mutableMapOf<String, GenesisFile>()
+  private val services = mutableMapOf<String, RLPxService>()
+  private val storageRepositories = mutableMapOf<String, 
BlockchainRepository>()
+  val peerRepositories = mutableMapOf<String, EthereumPeerRepository>()
+  private val dnsClients = mutableMapOf<String, DNSClient>()
+  private val discoveryServices = mutableMapOf<String, DiscoveryService>()
+  private val synchronizers = mutableMapOf<String, Synchronizer>()
 
   private val managerHandler = mutableListOf<DefaultCacheManager>()
 
@@ -272,28 +272,45 @@ class EthereumClient(
             }
           }
 
-          val synchronizer = PeerStatusEthSynchronizer(
-            repository = repository,
-            client = service.getClient(ETH66) as EthRequestsManager,
-            peerRepository = peerRepository,
-            adapter = adapter
-          )
-          synchronizers[rlpxConfig.getName() + "status"] = synchronizer
-          synchronizer.start()
-          val parentSynchronizer = FromUnknownParentSynchronizer(
-            repository = repository,
-            client = service.getClient(ETH66) as EthRequestsManager,
-            peerRepository = peerRepository
-          )
-          synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer
-          parentSynchronizer.start()
-          val bestSynchronizer = FromBestBlockSynchronizer(
-            repository = repository,
-            client = service.getClient(ETH66) as EthRequestsManager,
-            peerRepository = peerRepository
-          )
-          synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
-          bestSynchronizer.start()
+          for (sync in config.synchronizers()) {
+            when (sync.getType()) {
+              SynchronizerType.best -> {
+                val bestSynchronizer = FromBestBlockHeaderSynchronizer(
+                  repository = repository,
+                  client = service.getClient(ETH66) as EthRequestsManager,
+                  peerRepository = peerRepository,
+                  from = sync.getFrom(),
+                  to = sync.getTo(),
+                )
+                bestSynchronizer.start()
+                synchronizers[sync.getName()] = bestSynchronizer
+              }
+              SynchronizerType.status -> {
+                val synchronizer = PeerStatusEthSynchronizer(
+                  repository = repository,
+                  client = service.getClient(ETH66) as EthRequestsManager,
+                  peerRepository = peerRepository,
+                  adapter = adapter,
+                  from = sync.getFrom(),
+                  to = sync.getTo(),
+                )
+                synchronizer.start()
+                synchronizers[sync.getName()] = synchronizer
+              }
+              SynchronizerType.parent -> {
+                val parentSynchronizer = FromUnknownParentSynchronizer(
+                  repository = repository,
+                  client = service.getClient(ETH66) as EthRequestsManager,
+                  peerRepository = peerRepository,
+                  from = sync.getFrom(),
+                  to = sync.getTo(),
+                )
+                parentSynchronizer.start()
+                synchronizers[sync.getName()] = parentSynchronizer
+              }
+            }
+          }
+
           logger.info("Finished configuring Ethereum client 
${rlpxConfig.getName()}")
         }
       }
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
index 64c2d76..83833ea 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
@@ -26,6 +26,7 @@ import org.apache.tuweni.config.SchemaBuilder
 import org.apache.tuweni.crypto.SECP256K1
 import org.apache.tuweni.devp2p.parseEnodeUri
 import org.apache.tuweni.eth.genesis.GenesisFile
+import org.apache.tuweni.units.bigints.UInt256
 import org.slf4j.LoggerFactory
 import java.io.FileNotFoundException
 import java.net.URI
@@ -186,6 +187,21 @@ class EthereumClientConfig(private var config: 
Configuration = Configuration.emp
     }
   }
 
+  fun synchronizers(): List<SynchronizerConfiguration> {
+    val synchronizers = config.sections("synchronizer")
+    if (synchronizers == null || synchronizers.isEmpty()) {
+      return emptyList()
+    }
+    return synchronizers.map { section ->
+      val sectionConfig = config.getConfigurationSection("proxy.$section")
+      SynchronizerConfigurationImpl(
+        section, SynchronizerType.valueOf(sectionConfig.getString("type")),
+        UInt256.valueOf(sectionConfig.getLong("from")),
+        UInt256.valueOf(sectionConfig.getLong("to"))
+      )
+    }
+  }
+
   fun validate(): Stream<ConfigurationError> {
     val schema = createSchema()
     var errors = schema.validate(this.config)
@@ -205,7 +221,10 @@ class EthereumClientConfig(private var config: 
Configuration = Configuration.emp
   private fun validateSubsection(name: String, schema: Schema): 
Stream<ConfigurationError> {
     var errors = listOf<ConfigurationError>().stream()
     for (subSection in this.config.sections(name)) {
-      errors = Stream.concat(errors, 
schema.getSubSection(name).validate(this.config.getConfigurationSection("$name.$subSection")))
+      errors = Stream.concat(
+        errors,
+        
schema.getSubSection(name).validate(this.config.getConfigurationSection("$name.$subSection"))
+      )
     }
     return errors
   }
@@ -250,7 +269,12 @@ class EthereumClientConfig(private var config: 
Configuration = Configuration.emp
       val discoverySection = SchemaBuilder.create()
       discoverySection.addString("identity", "", "Node identity", null)
       discoverySection.addString("networkInterface", "127.0.0.1", "Network 
interface to bind", null)
-      discoverySection.addInteger("port", 0, "Port to expose the discovery 
service on", PropertyValidator.isValidPortOrZero())
+      discoverySection.addInteger(
+        "port",
+        0,
+        "Port to expose the discovery service on",
+        PropertyValidator.isValidPortOrZero()
+      )
       discoverySection.addString("peerRepository", "default", "Peer repository 
to which records should go", null)
 
       val genesis = SchemaBuilder.create()
@@ -276,7 +300,12 @@ class EthereumClientConfig(private var config: 
Configuration = Configuration.emp
         }
       )
       rlpx.addInteger("port", 0, "Port to expose the RLPx service on", 
PropertyValidator.isValidPortOrZero())
-      rlpx.addInteger("advertisedPort", 30303, "Port to advertise in 
communications as the RLPx service port", PropertyValidator.isValidPort())
+      rlpx.addInteger(
+        "advertisedPort",
+        30303,
+        "Port to advertise in communications as the RLPx service port",
+        PropertyValidator.isValidPort()
+      )
       rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum 
client", null)
       rlpx.addString("repository", "default", "Name of the blockchain 
repository", null)
       rlpx.addString("peerRepository", "default", "Peer repository to which 
records should go", null)
@@ -287,6 +316,17 @@ class EthereumClientConfig(private var config: 
Configuration = Configuration.emp
 
       val peerRepositoriesSection = SchemaBuilder.create()
       peerRepositoriesSection.addString("type", "memory", "Peer repository 
type", PropertyValidator.anyOf("memory"))
+
+      val synchronizersSection = SchemaBuilder.create()
+      synchronizersSection.addString(
+        "type",
+        "status",
+        "Synchronizer type",
+        PropertyValidator.anyOf("status", "parent", "best")
+      )
+      synchronizersSection.addLong("from", 0L, "Start block to sync from", 
PropertyValidator.isGreaterOrEqual(0L))
+      synchronizersSection.addLong("to", 0L, "End block to sync to", 
PropertyValidator.isGreaterOrEqual(0L))
+
       val builder = SchemaBuilder.create()
       builder.addSection("metrics", metricsSection.toSchema())
       builder.addSection("storage", storageSection.toSchema())
@@ -297,6 +337,7 @@ class EthereumClientConfig(private var config: 
Configuration = Configuration.emp
       builder.addSection("genesis", genesis.toSchema())
       builder.addSection("proxy", proxiesSection.toSchema())
       builder.addSection("peerRepository", peerRepositoriesSection.toSchema())
+      builder.addSection("synchronizer", synchronizersSection.toSchema())
 
       return builder.toSchema()
     }
@@ -379,7 +420,19 @@ interface PeerRepositoryConfiguration {
   fun getType(): String
 }
 
-internal class PeerRepositoryConfigurationImpl(private val repoName: String, 
private val type: String) : PeerRepositoryConfiguration {
+enum class SynchronizerType {
+  best, status, parent
+}
+
+interface SynchronizerConfiguration {
+  fun getName(): String
+  fun getType(): SynchronizerType
+  fun getFrom(): UInt256?
+  fun getTo(): UInt256?
+}
+
+internal class PeerRepositoryConfigurationImpl(private val repoName: String, 
private val type: String) :
+  PeerRepositoryConfiguration {
   override fun getName(): String = repoName
   override fun getType(): String = type
 }
@@ -414,7 +467,8 @@ internal data class RLPxServiceConfigurationImpl(
   override fun peerRepository(): String = peerRepository
 }
 
-internal data class GenesisFileConfigurationImpl(private val name: String, 
private val genesisFilePath: URI) : GenesisFileConfiguration {
+internal data class GenesisFileConfigurationImpl(private val name: String, 
private val genesisFilePath: URI) :
+  GenesisFileConfiguration {
   override fun getName(): String = name
 
   override fun genesisFile(): GenesisFile =
@@ -470,16 +524,33 @@ data class DiscoveryConfigurationImpl(
   override fun getPort() = port
 }
 
-data class StaticPeersConfigurationImpl(private val enodes: List<String>, 
private val peerRepository: String) : StaticPeersConfiguration {
+data class StaticPeersConfigurationImpl(private val enodes: List<String>, 
private val peerRepository: String) :
+  StaticPeersConfiguration {
 
   override fun enodes(): List<String> = enodes
 
   override fun peerRepository() = peerRepository
 }
 
-data class ProxyConfigurationImpl(private val name: String, private val 
upstream: String, private val downstream: String) : ProxyConfiguration {
+data class ProxyConfigurationImpl(
+  private val name: String,
+  private val upstream: String,
+  private val downstream: String,
+) : ProxyConfiguration {
   override fun name() = name
 
   override fun upstream() = upstream
   override fun downstream() = downstream
 }
+
+data class SynchronizerConfigurationImpl(
+  private val name: String,
+  private val type: SynchronizerType,
+  private val from: UInt256?,
+  private val to: UInt256?,
+) : SynchronizerConfiguration {
+  override fun getName() = name
+  override fun getType() = type
+  override fun getFrom(): UInt256? = from
+  override fun getTo(): UInt256? = to
+}
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt
similarity index 91%
rename from 
eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
rename to 
eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt
index 4edb80a..d70a3bc 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt
@@ -22,6 +22,7 @@ import kotlinx.coroutines.launch
 import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
 import kotlin.coroutines.CoroutineContext
@@ -29,13 +30,15 @@ import kotlin.coroutines.CoroutineContext
 const val BEST_PEER_DELAY: Long = 5000
 const val HEADERS_RESPONSE_TIMEOUT: Long = 10000
 
-class FromBestBlockSynchronizer(
+class FromBestBlockHeaderSynchronizer(
   executor: ExecutorService = Executors.newSingleThreadExecutor(),
   coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
   repository: BlockchainRepository,
   client: EthRequestsManager,
   peerRepository: EthereumPeerRepository,
-) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository) {
+  from: UInt256?,
+  to: UInt256?,
+) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository, from, to) {
 
   override fun start() {
     launch {
@@ -51,6 +54,9 @@ class FromBestBlockSynchronizer(
   }
 
   private fun askNextBestHeaders(header: BlockHeader) {
+    if ((null != from && header.number < from) || (null != to && header.number 
> to)) {
+      return
+    }
     launch {
       if (peerRepository.activeConnections().count() == 0L) {
         askNextBestHeaders(header)
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
index 2e9b9c4..6222608 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
@@ -22,6 +22,7 @@ import kotlinx.coroutines.launch
 import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
 import kotlin.coroutines.CoroutineContext
@@ -34,8 +35,10 @@ class FromUnknownParentSynchronizer(
   coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
   repository: BlockchainRepository,
   client: EthRequestsManager,
-  peerRepository: EthereumPeerRepository
-) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository) {
+  peerRepository: EthereumPeerRepository,
+  from: UInt256?,
+  to: UInt256?
+) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository, from, to) {
 
   var listenerId: String? = null
 
@@ -54,6 +57,9 @@ class FromUnknownParentSynchronizer(
     if (header.number.isZero) {
       return
     }
+    if ((null != from && header.number < from) || (null != to && header.number 
> to)) {
+      return
+    }
     val parentHash = header.parentHash ?: return
     launch {
       delay(DELAY)
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
index 1df2b15..0752b13 100644
--- 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
+++ 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
@@ -21,6 +21,7 @@ import kotlinx.coroutines.launch
 import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
 import kotlin.coroutines.CoroutineContext
@@ -39,8 +40,10 @@ class PeerStatusEthSynchronizer(
   repository: BlockchainRepository,
   client: EthRequestsManager,
   peerRepository: EthereumPeerRepository,
-  private val adapter: WireConnectionPeerRepositoryAdapter
-) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository) {
+  private val adapter: WireConnectionPeerRepositoryAdapter,
+  from: UInt256?,
+  to: UInt256?
+) : Synchronizer(executor, coroutineContext, repository, client, 
peerRepository, from, to) {
 
   var listenerId: String? = null
 
diff --git 
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt 
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
index e0db77d..2cac6a3 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
@@ -25,6 +25,7 @@ import org.apache.tuweni.devp2p.eth.EthRequestsManager
 import org.apache.tuweni.eth.BlockHeader
 import org.apache.tuweni.eth.Hash
 import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
 import org.slf4j.LoggerFactory
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
@@ -37,7 +38,9 @@ abstract class Synchronizer(
   override val coroutineContext: CoroutineContext = 
executor.asCoroutineDispatcher(),
   val repository: BlockchainRepository,
   val client: EthRequestsManager,
-  val peerRepository: EthereumPeerRepository
+  val peerRepository: EthereumPeerRepository,
+  val from: UInt256?,
+  val to: UInt256?
 ) : CoroutineScope {
   abstract fun start()
   abstract fun stop()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org
For additional commands, e-mail: commits-h...@tuweni.apache.org

Reply via email to