Repository: storm Updated Branches: refs/heads/master fa25f3d7f -> 3b6813838
util port allocation conversion to java Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae619f31 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae619f31 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae619f31 Branch: refs/heads/master Commit: ae619f31a85e2924172c0e7014e7fa03240a0da3 Parents: c2cf3be Author: Sanket <schintap@untilservice-lm> Authored: Mon Mar 14 12:26:50 2016 -0500 Committer: Sanket <schintap@untilservice-lm> Committed: Mon Mar 14 12:26:50 2016 -0500 ---------------------------------------------------------------------- storm-core/src/clj/org/apache/storm/util.clj | 11 ----------- .../jvm/org/apache/storm/utils/ConfigUtils.java | 10 +++++++++- .../src/jvm/org/apache/storm/utils/Utils.java | 17 +++++++++++++++++ .../org/apache/storm/messaging/netty_unit_test.clj | 14 +++++++------- .../org/apache/storm/security/auth/auth_test.clj | 15 ++++++++------- .../apache/storm/security/auth/drpc_auth_test.clj | 15 ++++++++------- .../storm/security/auth/nimbus_auth_test.clj | 17 +++++++++-------- 7 files changed, 58 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/src/clj/org/apache/storm/util.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj index 72778bb..016fe55 100644 --- a/storm-core/src/clj/org/apache/storm/util.clj +++ b/storm-core/src/clj/org/apache/storm/util.clj @@ -143,17 +143,6 @@ true (throw ~error-local) ))))) -(letfn [(try-port [port] - (with-open [socket (java.net.ServerSocket. port)] - (.getLocalPort socket)))] - (defn available-port - ([] (try-port 0)) - ([preferred] - (try - (try-port preferred) - (catch java.io.IOException e - (available-port)))))) - (defn clojurify-structure [s] (prewalk (fn [x] http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java index c6543d4..d7b7dbf 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -25,7 +25,15 @@ import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index e59f83f..b8a6c1a 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -100,6 +100,7 @@ import java.net.InetAddress; import java.net.URL; import java.net.URLDecoder; import java.net.UnknownHostException; +import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.file.FileSystems; import java.nio.file.Files; @@ -1501,6 +1502,22 @@ public class Utils { } } + public static int getAvailablePort(int prefferedPort) { + int localPort = -1; + try(ServerSocket socket = new ServerSocket(prefferedPort)) { + localPort = socket.getLocalPort(); + } catch(IOException exp) { + if (prefferedPort > 0) { + return getAvailablePort(0); + } + } + return localPort; + } + + public static int getAvailablePort() { + return getAvailablePort(0); + } + /** * Determines if a zip archive contains a particular directory. * http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj index 786045e..4b6ae0d 100644 --- a/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj +++ b/storm-core/test/clj/org/apache/storm/messaging/netty_unit_test.clj @@ -21,7 +21,7 @@ (:use [org.apache.storm.daemon.worker :only [is-connection-ready]]) (:import [java.util ArrayList])) -(def port (available-port)) +(def port (Utils/getAvailablePort)) (def task 1) ;; In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be @@ -66,7 +66,7 @@ (log-message "1. Should send and receive a basic message") (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") context (TransportFactory/makeContext storm-conf) - port (available-port 6700) + port (Utils/getAvailablePort (int 6700)) resp (atom nil) server (.bind context nil port) _ (register-callback (fn [message] (reset! resp message)) server) @@ -104,7 +104,7 @@ (log-message "2 test load") (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") context (TransportFactory/makeContext storm-conf) - port (available-port 6700) + port (Utils/getAvailablePort (int 6700)) resp (atom nil) server (.bind context nil port) _ (register-callback (fn [message] (reset! resp message)) server) @@ -147,7 +147,7 @@ (log-message "3 Should send and receive a large message") (let [req_msg (apply str (repeat 2048000 'c')) context (TransportFactory/makeContext storm-conf) - port (available-port 6700) + port (Utils/getAvailablePort (int 6700)) resp (atom nil) server (.bind context nil port) _ (register-callback (fn [message] (reset! resp message)) server) @@ -186,7 +186,7 @@ (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz") context (TransportFactory/makeContext storm-conf) resp (atom nil) - port (available-port 6700) + port (Utils/getAvailablePort (int 6700)) client (.connect context nil "localhost" port) server (Thread. @@ -234,7 +234,7 @@ resp (ArrayList.) received (atom 0) context (TransportFactory/makeContext storm-conf) - port (available-port 6700) + port (Utils/getAvailablePort (int 6700)) server (.bind context nil port) _ (register-callback (fn [message] (.add resp message) (swap! received inc)) server) client (.connect context nil "localhost" port) @@ -292,7 +292,7 @@ TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS false} resp (atom nil) context (TransportFactory/makeContext storm-conf) - port (available-port 6700) + port (Utils/getAvailablePort (int 6700)) client (.connect context nil "localhost" port) _ (.send client task (.getBytes req_msg)) server (.bind context nil port) http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj index 54441c3..56367e8 100644 --- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj +++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj @@ -38,7 +38,8 @@ (:use [org.apache.storm testing]) (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Iface StormTopology SubmitOptions KillOptions RebalanceOptions ClusterSummary TopologyInfo Nimbus$Processor] - (org.json.simple JSONValue))) + (org.json.simple JSONValue)) + (:import [org.apache.storm.utils Utils])) (defn mk-principal [name] (reify Principal @@ -159,7 +160,7 @@ (is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm")))))) (deftest Simple-authentication-test - (let [a-port (available-port)] + (let [a-port (Utils/getAvailablePort)] (with-server [a-port nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil] (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"}) @@ -177,7 +178,7 @@ (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))))) (deftest negative-whitelist-authorization-test - (let [a-port (available-port)] + (let [a-port (Utils/getAvailablePort)] (with-server [a-port nil "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" "org.apache.storm.testing.SingleUserSimpleTransport" nil] @@ -191,7 +192,7 @@ (.close client))))) (deftest positive-whitelist-authorization-test - (let [a-port (available-port)] + (let [a-port (Utils/getAvailablePort)] (with-server [a-port nil "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer" "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}] @@ -334,7 +335,7 @@ (deftest positive-authorization-test - (let [a-port (available-port)] + (let [a-port (Utils/getAvailablePort)] (with-server [a-port nil "org.apache.storm.security.auth.authorizer.NoopAuthorizer" "org.apache.storm.security.auth.SimpleTransportPlugin" nil] @@ -347,7 +348,7 @@ (.close client))))) (deftest deny-authorization-test - (let [a-port (available-port)] + (let [a-port (Utils/getAvailablePort)] (with-server [a-port nil "org.apache.storm.security.auth.authorizer.DenyAuthorizer" "org.apache.storm.security.auth.SimpleTransportPlugin" nil] @@ -363,7 +364,7 @@ (.close client))))) (deftest digest-authentication-test - (let [a-port (available-port)] + (let [a-port (Utils/getAvailablePort)] (with-server [a-port "test/clj/org/apache/storm/security/auth/jaas_digest.conf" nil http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj index 6b1aaa4..d0dfe2d 100644 --- a/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj +++ b/storm-core/test/clj/org/apache/storm/security/auth/drpc_auth_test.clj @@ -28,7 +28,8 @@ (:import [javax.security.auth Subject]) (:use [org.apache.storm util config log]) (:use [org.apache.storm.daemon common]) - (:use [org.apache.storm testing])) + (:use [org.apache.storm testing]) + (:import [org.apache.storm.utils Utils])) (def DRPC-TIMEOUT-SEC (* (/ TEST-TIMEOUT-MS 1000) 2)) @@ -64,8 +65,8 @@ )) (deftest deny-drpc-test - (let [client-port (available-port) - invocations-port (available-port (inc client-port)) + (let [client-port (Utils/getAvailablePort) + invocations-port (Utils/getAvailablePort (int(inc client-port))) storm-conf (clojurify-structure (ConfigUtils/readStormConfig))] (with-server [storm-conf "org.apache.storm.security.auth.authorizer.DenyAuthorizer" nil nil client-port invocations-port] @@ -79,8 +80,8 @@ (.close invocations))))) (deftest deny-drpc-digest-test - (let [client-port (available-port) - invocations-port (available-port (inc client-port)) + (let [client-port (Utils/getAvailablePort) + invocations-port (Utils/getAvailablePort (int (inc client-port))) storm-conf (clojurify-structure (ConfigUtils/readStormConfig))] (with-server [storm-conf "org.apache.storm.security.auth.authorizer.DenyAuthorizer" "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" @@ -99,8 +100,8 @@ (defmacro with-simple-drpc-test-scenario [[strict? alice-client bob-client charlie-client alice-invok charlie-invok] & body] - (let [client-port (available-port) - invocations-port (available-port (inc client-port)) + (let [client-port (Utils/getAvailablePort) + invocations-port (Utils/getAvailablePort (int (inc client-port))) storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {DRPC-AUTHORIZER-ACL-STRICT strict? DRPC-AUTHORIZER-ACL-FILENAME "drpc-simple-acl-test-scenario.yaml" http://git-wip-us.apache.org/repos/asf/storm/blob/ae619f31/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj ---------------------------------------------------------------------- diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj index 307296a..eeb4813 100644 --- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj +++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj @@ -24,10 +24,11 @@ (:import [org.apache.storm.generated NotAliveException]) (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient ReqContext ThriftConnectionType]) - (:use [org.apache.storm util config log]) - (:use [org.apache.storm.daemon common nimbus]) - (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Processor + (:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Processor AuthorizationException SubmitOptions TopologyInitialStatus KillOptions]) + (:import [org.apache.storm.utils Utils]) + (:use [org.apache.storm cluster util config log]) + (:use [org.apache.storm.daemon common nimbus]) (:require [conjure.core]) (:use [conjure core])) @@ -55,7 +56,7 @@ (.stop nimbus-server#))) (deftest Simple-authentication-test - (let [port (available-port)] + (let [port (Utils/getAvailablePort)] (with-test-cluster [port nil nil "org.apache.storm.security.auth.SimpleTransportPlugin"] (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin" @@ -68,7 +69,7 @@ (.close client))))) (deftest test-noop-authorization-w-simple-transport - (let [port (available-port)] + (let [port (Utils/getAvailablePort)] (with-test-cluster [port nil "org.apache.storm.security.auth.authorizer.NoopAuthorizer" "org.apache.storm.security.auth.SimpleTransportPlugin"] @@ -83,7 +84,7 @@ (.close client))))) (deftest test-deny-authorization-w-simple-transport - (let [port (available-port)] + (let [port (Utils/getAvailablePort)] (with-test-cluster [port nil "org.apache.storm.security.auth.authorizer.DenyAuthorizer" "org.apache.storm.security.auth.SimpleTransportPlugin"] @@ -121,7 +122,7 @@ (.close client))))) (deftest test-noop-authorization-w-sasl-digest - (let [port (available-port)] + (let [port (Utils/getAvailablePort)] (with-test-cluster [port "test/clj/org/apache/storm/security/auth/jaas_digest.conf" "org.apache.storm.security.auth.authorizer.NoopAuthorizer" @@ -139,7 +140,7 @@ (.close client))))) (deftest test-deny-authorization-w-sasl-digest - (let [port (available-port)] + (let [port (Utils/getAvailablePort)] (with-test-cluster [port "test/clj/org/apache/storm/security/auth/jaas_digest.conf" "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
