This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git

commit a34e8813fd717173c956f1e2965c234af0948f23
Author: Antoine Toulme <anto...@lunar-ocean.com>
AuthorDate: Fri Oct 23 00:23:29 2020 -0700

    Add an experimental discv4 scraper
---
 devp2p/build.gradle                                |  12 ++
 .../main/kotlin/org/apache/tuweni/devp2p/Peer.kt   |   5 +
 .../kotlin/org/apache/tuweni/devp2p/Scraper.kt     | 133 +++++++++++++++++++++
 3 files changed, 150 insertions(+)

diff --git a/devp2p/build.gradle b/devp2p/build.gradle
index 563ebe1..090c522 100644
--- a/devp2p/build.gradle
+++ b/devp2p/build.gradle
@@ -46,3 +46,15 @@ application {
   mainClassName = 'org.apache.tuweni.devp2p.v5.ScraperApp'
   applicationName = 'scraper'
 }
+
+task v4ScraperApp(type: CreateStartScripts) {
+  mainClassName = "org.apache.tuweni.devp2p.ScraperApp"
+  applicationName = "v4scraper"
+  outputDir = new File(project.buildDir, 'scripts')
+  classpath = project.startScripts.classpath
+}
+
+applicationDistribution.into("bin") {
+  from(v4ScraperApp)
+  fileMode = 0755
+}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt
index 308c408..c97a550 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Peer.kt
@@ -104,4 +104,9 @@ interface Peer {
    * @param time the time this endpoint information was determined, in 
milliseconds since the epoch
    */
   fun updateENR(record: EthereumNodeRecord, time: Long)
+
+  fun uri(): String {
+    return 
"enode://${nodeId.toHexString()}@${endpoint.address.hostAddress}:${endpoint.tcpPort
+      ?: 30303}?discPort=${endpoint.udpPort}"
+  }
 }
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt 
b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
new file mode 100644
index 0000000..6e4a10a
--- /dev/null
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/Scraper.kt
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.devp2p
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.asCoroutineDispatcher
+import kotlinx.coroutines.async
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.concurrent.ExpiringSet
+import org.apache.tuweni.concurrent.coroutines.await
+import org.apache.tuweni.crypto.SECP256K1
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import java.net.InetSocketAddress
+import java.net.URI
+import java.security.Security
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executors
+import java.util.concurrent.atomic.AtomicBoolean
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Wrapper to run the scraper as an app.
+ */
+object ScraperApp {
+
+  @JvmStatic
+  fun main(args: Array<String>) {
+    Security.addProvider(BouncyCastleProvider())
+    run(args)
+  }
+
+  fun run(args: Array<String>) {
+    val uris = args.map { URI.create(it) }
+    val addr = InetSocketAddress("0.0.0.0", 11000)
+    val seen = ConcurrentHashMap.newKeySet<String>()
+    val scraper = Scraper(initialURIs = uris, bindAddress = addr, listeners = 
listOf { _, nodes ->
+      for (node in nodes) {
+        if (seen.add(node.uri())) {
+          println(node)
+        }
+      }
+    })
+    Runtime.getRuntime().addShutdownHook(Thread {
+      runBlocking {
+        scraper.stop().await()
+      }
+    })
+    runBlocking {
+      scraper.start().await()
+    }
+  }
+}
+
+/**
+ * Discovery scraper that will continue asking peers for peers, and iterate 
over them, until told to stop.
+ *
+ * The scraper sends events of discoveries to listeners.
+ */
+class Scraper(
+  override val coroutineContext: CoroutineContext = 
Executors.newFixedThreadPool(100).asCoroutineDispatcher(),
+  val initialURIs: List<URI>,
+  val bindAddress: InetSocketAddress,
+  val listeners: List<(Peer, List<Peer>) -> Unit>,
+  val maxWaitForNewPeers: Long = 20,
+  val waitBetweenScrapes: Long = 5 * 60
+) : CoroutineScope {
+
+  private var service: DiscoveryService? = null
+  private val started = AtomicBoolean(false)
+
+  fun start() = async {
+    val newService = DiscoveryService.open(
+      keyPair = SECP256K1.KeyPair.random(),
+      bindAddress = bindAddress,
+      bootstrapURIs = initialURIs
+    )
+    service = newService
+    started.set(true)
+    while (started.get()) {
+      discover(maxWaitForNewPeers).await()
+      delay(waitBetweenScrapes * 1000)
+    }
+  }
+
+  fun discover(maxWaitForNewPeers: Long) = async {
+    var newPeersDetected = true
+    val nodes = ExpiringSet<Peer>(24 * 60 * 60 * 1000)
+    while (newPeersDetected) {
+      newPeersDetected = false
+      for (node in nodes) {
+        service?.lookupAsync(node.nodeId)?.thenAccept {
+          if (it.isNotEmpty()) {
+            for (listener in listeners) {
+              launch {
+                listener(node, it)
+              }
+            }
+            for (newPeer in it) {
+              if (nodes.add(newPeer)) {
+                newPeersDetected = true
+              }
+            }
+          }
+        }
+      }
+      delay(maxWaitForNewPeers * 1000)
+    }
+  }
+
+  fun stop() = async {
+    if (started.compareAndSet(true, false)) {
+      val s = service
+      service = null
+      s!!.awaitTermination()
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org
For additional commands, e-mail: commits-h...@tuweni.apache.org

Reply via email to