Bittorrent binding interface needs to ignore loopback and broadcast interface.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02c27a9a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02c27a9a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02c27a9a Branch: refs/heads/nimbus-ha-branch Commit: 02c27a9a84813082fe30e6ffcd52425f85b99176 Parents: fa69f2a Author: Parth Brahmbhatt <[email protected]> Authored: Mon Dec 15 16:30:21 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Mon Dec 15 16:30:21 2014 -0800 ---------------------------------------------------------------------- .../torrent/BitTorrentCodeDistributor.java | 79 +++++++++++--------- 1 file changed, 45 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/02c27a9a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java index 4fe45a6..f4512b4 100644 --- a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java +++ b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java @@ -9,6 +9,7 @@ import com.turn.ttorrent.client.SharedTorrent; import com.turn.ttorrent.common.Torrent; import com.turn.ttorrent.tracker.TrackedTorrent; import com.turn.ttorrent.tracker.Tracker; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,17 +18,14 @@ import java.io.FileOutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; -import java.nio.file.CopyOption; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.net.UnknownHostException; import java.util.*; public class BitTorrentCodeDistributor implements ICodeDistributor { private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class); private Tracker tracker; private String hostName; - private InetAddress host; + private InetSocketAddress address; private Integer port; protected HashMap<String, Client> clients = new HashMap<String, Client>(); protected Double maxDownload; @@ -37,18 +35,18 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { @Override public void prepare(Map conf) throws Exception { this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); - this.port = (Integer)conf.get(Config.BITTORRENT_PORT); - this.host = InetAddress.getLocalHost(); - this.maxDownload = (Double)conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE); - this.maxUpload = (Double)conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE); - this.seedDuration = (Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); + this.port = (Integer) conf.get(Config.BITTORRENT_PORT); + this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE); + this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload)); LOG.info("Starting bt tracker bound to hostname '{}'", hostName); - //using "::" to ensure we bind to all IPV4 and IPV6 network interfaces. - InetSocketAddress socketAddr = new InetSocketAddress("::", port); - this.tracker = new Tracker(socketAddr); + //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces. + this.address = new InetSocketAddress("0.0.0.0", port); + + this.tracker = new Tracker(address); LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); this.tracker.start(); } @@ -60,6 +58,8 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); LOG.info("Creating torrent with announce URL: {}", uri); + + //TODO: why does listing the directory not work? ArrayList<File> files = new ArrayList<File>(); files.add(new File(destDir, "stormjar.jar")); files.add(new File(destDir, "stormconf.ser")); @@ -68,41 +68,42 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); File torrentFile = new File(destDir, "storm-code-distributor.meta"); torrent.save(new FileOutputStream(torrentFile)); - LOG.info("Saved torrent: {}" + torrentFile.getAbsolutePath()); + LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath()); this.tracker.announce(new TrackedTorrent(torrent)); - LOG.info("Torrent announced to tracker."); - Client client = new Client(host, new SharedTorrent(torrent, destDir.getParentFile(), true)); + + Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true)); this.clients.put(topologyId, client); rebalanceRates(); client.share(); LOG.info("Seeding torrent..."); + /** + * Every time on prepare we need to call tracker.announce for all torrents that + * exists in the file system, other wise the tracker will reject any peer request + * with unknown torrents. You need to bootstrap trackers. + */ return torrentFile; } @Override public List<File> download(String topologyId, File torrentFile) throws Exception { LOG.info("Initiating BitTorrent download."); - InetAddress netAddr = InetAddress.getLocalHost(); - //TODO: This should be configured, the assumption that the files should be downloaded - //in parent folder is probably not best one. File destDir = torrentFile.getParentFile(); LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); - - Client client = new Client(netAddr, st); + Client client = new Client(getInetAddress(), st); this.clients.put(topologyId, client); rebalanceRates(); client.share(this.seedDuration); //TODO: Should have a timeout after which we just fail the supervisor. - if(this.seedDuration == 0) { + if (this.seedDuration == 0) { client.waitForCompletion(); } else { LOG.info("Waiting for seeding to begin..."); - while(client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR){ + while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) { try { Thread.sleep(10); } catch (InterruptedException e) { @@ -120,9 +121,9 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { * given folder only and no extra folder needs to be created. */ - File srcDir = Paths.get(destDir.getPath(), topologyId).toFile(); - for(File file : srcDir.listFiles()) { - Files.copy(file.toPath(), destDir.toPath().resolve(file.getName())); + File srcDir = new File(destDir, topologyId); + for (File file : srcDir.listFiles()) { + FileUtils.copyFileToDirectory(file, destDir); file.delete(); } srcDir.delete(); @@ -130,15 +131,25 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { return Lists.newArrayList(destDir.listFiles()); } + private InetAddress getInetAddress() throws UnknownHostException { + for (InetAddress addr : InetAddress.getAllByName(this.hostName)) { + if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) { + return addr; + } + } + + throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration.") + } + @Override public short getReplicationCount(String topologyId) { Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents(); - //TODO: this needs to be tested. - for(TrackedTorrent trackedTorrent: trackedTorrents) { - if(trackedTorrent.getName().equals(topologyId)) { + for (final TrackedTorrent trackedTorrent : trackedTorrents) { + if (trackedTorrent.getName().equals(topologyId)) { return Shorts.checkedCast(trackedTorrent.seeders()); } } + LOG.warn("No torrent found in tracker for topologyId = " + topologyId); return 0; } @@ -147,7 +158,7 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { public void cleanup(String topologyId) { LOG.info("Stop seeding/tracking for topology {}", topologyId); Client client = this.clients.remove(topologyId); - if(client != null){ + if (client != null) { Torrent torrent = client.getTorrent(); client.stop(); this.tracker.remove(torrent); @@ -160,21 +171,21 @@ public class BitTorrentCodeDistributor implements ICodeDistributor { this.tracker.stop(); } - private synchronized void rebalanceRates(){ + private synchronized void rebalanceRates() { int clientCount = this.clients.size(); - if(clientCount > 0){ + if (clientCount > 0) { double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); - for(Client client : this.clients.values()) { + for (Client client : this.clients.values()) { client.setMaxDownloadRate(maxDl); client.setMaxUploadRate(maxUl); } } } - private static String format(double val){ + private static String format(double val) { return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); } }
