GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7f9218ae Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7f9218ae Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7f9218ae Branch: refs/heads/release-1.1 Commit: 7f9218aeb6410929ddada81b4fabb17bf8636a4c Parents: 4223ccc Author: Pavan Kumar <[email protected]> Authored: Tue Jul 8 10:58:09 2014 -0700 Committer: Pavan Kumar <[email protected]> Committed: Tue Jul 8 10:59:50 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/utils/InternalVertexRunner.java | 63 ++++++++++++++++---- 2 files changed, 54 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/7f9218ae/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index ea2f911..834b45f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka) + GIRAPH-713: Provide an option to do request compression (pavanka) GIRAPH-923: Upgrade Netty version to a latest stable one (pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/7f9218ae/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java index 09dd46d..2c4606f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java @@ -38,6 +38,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import java.io.File; import java.io.IOException; +import java.net.ServerSocket; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -55,8 +56,10 @@ import java.util.concurrent.TimeUnit; */ @SuppressWarnings("unchecked") public class InternalVertexRunner { - /** ZooKeeper port to use for tests */ - public static final int LOCAL_ZOOKEEPER_PORT = 22182; + /** Range of ZooKeeper ports to use for tests */ + public static final int LOCAL_ZOOKEEPER_PORT_FROM = 22182; + /** Range of ZooKeeper ports to use for tests */ + public static final int LOCAL_ZOOKEEPER_PORT_TO = 65535; /** Logger */ private static final Logger LOG = @@ -166,11 +169,13 @@ public class InternalVertexRunner { FileUtils.writeLines(edgeInputFile, edgeInputData); } + int localZookeeperPort = findAvailablePort(); + conf.setWorkerConfiguration(1, 1, 100.0f); GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); GiraphConstants.LOCAL_TEST_MODE.set(conf, true); conf.setZookeeperList("localhost:" + - String.valueOf(LOCAL_ZOOKEEPER_PORT)); + String.valueOf(localZookeeperPort)); conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, @@ -190,10 +195,10 @@ public class InternalVertexRunner { new Path(edgeInputFile.toString())); } FileOutputFormat.setOutputPath(job.getInternalJob(), - new Path(outputDir.toString())); + new Path(outputDir.toString())); // Configure a local zookeeper instance - Properties zkProperties = configLocalZooKeeper(zkDir); + Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort); QuorumPeerConfig qpConfig = new QuorumPeerConfig(); qpConfig.parseProperties(zkProperties); @@ -227,8 +232,8 @@ public class InternalVertexRunner { * @throws Exception if anything goes wrong */ public static <I extends WritableComparable, - V extends Writable, - E extends Writable> void run( + V extends Writable, + E extends Writable> void run( GiraphConfiguration conf, TestGraph<I, V, E> graph) throws Exception { File tmpDir = null; @@ -247,11 +252,13 @@ public class InternalVertexRunner { InMemoryVertexInputFormat.setGraph(graph); + int localZookeeperPort = findAvailablePort(); + conf.setWorkerConfiguration(1, 1, 100.0f); GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); GiraphConstants.LOCAL_TEST_MODE.set(conf, true); GiraphConstants.ZOOKEEPER_LIST.set(conf, "localhost:" + - String.valueOf(LOCAL_ZOOKEEPER_PORT)); + String.valueOf(localZookeeperPort)); conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, @@ -259,7 +266,7 @@ public class InternalVertexRunner { GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString()); // Configure a local zookeeper instance - Properties zkProperties = configLocalZooKeeper(zkDir); + Properties zkProperties = configLocalZooKeeper(zkDir, localZookeeperPort); QuorumPeerConfig qpConfig = new QuorumPeerConfig(); qpConfig.parseProperties(zkProperties); @@ -298,14 +305,16 @@ public class InternalVertexRunner { * Configuration options for running local ZK. * * @param zkDir directory for ZK to hold files in. + * @param zookeeperPort port zookeeper will listen on * @return Properties configured for local ZK. */ - private static Properties configLocalZooKeeper(File zkDir) { + private static Properties configLocalZooKeeper(File zkDir, + int zookeeperPort) { Properties zkProperties = new Properties(); zkProperties.setProperty("tickTime", "2000"); zkProperties.setProperty("dataDir", zkDir.getAbsolutePath()); zkProperties.setProperty("clientPort", - String.valueOf(LOCAL_ZOOKEEPER_PORT)); + String.valueOf(zookeeperPort)); zkProperties.setProperty("maxClientCnxns", "10000"); zkProperties.setProperty("minSessionTimeout", "10000"); zkProperties.setProperty("maxSessionTimeout", "100000"); @@ -316,6 +325,38 @@ public class InternalVertexRunner { } /** + * Scans for available port. Returns first port where + * we can open server socket. + * Note: if another process opened port with SO_REUSEPORT then this + * function may return port that is in use. It actually happens + * with NetCat on Mac. + * @return available port + */ + private static int findAvailablePort() { + for (int port = LOCAL_ZOOKEEPER_PORT_FROM; + port < LOCAL_ZOOKEEPER_PORT_TO; port++) { + ServerSocket ss = null; + try { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + return port; + } catch (IOException e) { + LOG.info("findAvailablePort: port " + port + " is in use."); + } finally { + if (ss != null && !ss.isClosed()) { + try { + ss.close(); + } catch (IOException e) { + LOG.info("findAvailablePort: can't close test socket", e); + } + } + } + } + throw new RuntimeException("No port found in the range [ " + + LOCAL_ZOOKEEPER_PORT_FROM + ", " + LOCAL_ZOOKEEPER_PORT_TO + ")"); + } + + /** * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown */ private static class InternalZooKeeper extends ZooKeeperServerMain {
