[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
GitHub user rxin opened a pull request: https://github.com/apache/spark/pull/3625 [SPARK-4740] [WIP] Create multiple concurrent connections between two peer nodes in Netty. Need to test add test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rxin/spark SPARK-4740 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3625.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3625 commit 4f216736024f48ed9fa3fca6ea5953495def8e51 Author: Reynold Xin r...@databricks.com Date: 2014-12-05T21:20:58Z [SPARK-4740] Create multiple concurrent connections between two peer nodes in Netty. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3625#issuecomment-65857520 [Test build #24193 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24193/consoleFull) for PR 3625 at commit [`3e1306c`](https://github.com/apache/spark/commit/3e1306cd443fe3f580e44bf947b00223f0beb747). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3625#issuecomment-65865836 [Test build #24193 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24193/consoleFull) for PR 3625 at commit [`3e1306c`](https://github.com/apache/spark/commit/3e1306cd443fe3f580e44bf947b00223f0beb747). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/3625#issuecomment-65865847 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24193/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3625#discussion_r21406023 --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -56,12 +57,28 @@ * TransportClient, all given {@link TransportClientBootstrap}s will be run. */ public class TransportClientFactory implements Closeable { + + /** A simple data structure to track the pool of clients between two peer nodes. */ + private class ClientPool { +TransportClient[] clients; +Object[] locks; + +public ClientPool() { + clients = new TransportClient[numConnectionsPerPeer]; --- End diff -- We can make this a private static class if we make this a constructor parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3625#discussion_r21406649 --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -97,23 +116,45 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO // Get connection from the connection pool first. // If it is not found or not active, create a new one. final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); -TransportClient cachedClient = connectionPool.get(address); + +// Create the ClientPool if we don't have it yet. +ClientPool clientPool = connectionPool.get(address); +if (clientPool == null) { + clientPool = connectionPool.putIfAbsent(address, new ClientPool()); --- End diff -- putIfAbsent returns the *previous* value, so this would be null --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3625#discussion_r21406760 --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -97,23 +116,45 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO // Get connection from the connection pool first. // If it is not found or not active, create a new one. final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort); -TransportClient cachedClient = connectionPool.get(address); + +// Create the ClientPool if we don't have it yet. +ClientPool clientPool = connectionPool.get(address); +if (clientPool == null) { + clientPool = connectionPool.putIfAbsent(address, new ClientPool()); +} + +int clientIndex = rand.nextInt(numConnectionsPerPeer); +TransportClient cachedClient = clientPool.clients[clientIndex]; if (cachedClient != null) { if (cachedClient.isActive()) { logger.trace(Returning cached connection to {}: {}, address, cachedClient); return cachedClient; } else { logger.info(Found inactive connection to {}, closing it., address); -connectionPool.remove(address, cachedClient); // Remove inactive clients. +clientPool.clients[clientIndex] = null; // Remove inactive clients. --- End diff -- Shouldn't this be behind a lock? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3625#discussion_r21406852 --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -143,43 +184,41 @@ public void initChannel(SocketChannel ch) { assert client != null : Channel future completed successfully with null client; // Execute any client bootstraps synchronously before marking the Client as successful. -long preBootstrap = System.currentTimeMillis(); +long preBootstrap = System.nanoTime(); logger.debug(Connection to {} successful, running bootstraps..., address); try { for (TransportClientBootstrap clientBootstrap : clientBootstraps) { clientBootstrap.doBootstrap(client); } } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala - long bootstrapTime = System.currentTimeMillis() - preBootstrap; - logger.error(Exception while bootstrapping client after + bootstrapTime + ms, e); + long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 100; + logger.error(Exception while bootstrapping client after + bootstrapTimeMs + ms, e); client.close(); throw Throwables.propagate(e); } -long postBootstrap = System.currentTimeMillis(); - -// Successful connection bootstrap -- in the event that two threads raced to create a client, -// use the first one that was put into the connectionPool and close the one we made here. -TransportClient oldClient = connectionPool.putIfAbsent(address, client); -if (oldClient == null) { - logger.debug(Successfully created connection to {} after {} ms ({} ms spent in bootstraps), -address, postBootstrap - preConnect, postBootstrap - preBootstrap); - return client; -} else { - logger.debug(Two clients were created concurrently after {} ms, second will be disposed., -postBootstrap - preConnect); - client.close(); - return oldClient; -} +long postBootstrap = System.nanoTime(); + +logger.debug(Successfully created connection to {} after {} ms ({} ms spent in bootstraps), +address, (postBootstrap - preConnect) / 100, (postBootstrap - preBootstrap) / 100); --- End diff -- 2 spaces for indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/3625#discussion_r21406868 --- Diff: network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -143,43 +184,41 @@ public void initChannel(SocketChannel ch) { assert client != null : Channel future completed successfully with null client; // Execute any client bootstraps synchronously before marking the Client as successful. -long preBootstrap = System.currentTimeMillis(); +long preBootstrap = System.nanoTime(); logger.debug(Connection to {} successful, running bootstraps..., address); try { for (TransportClientBootstrap clientBootstrap : clientBootstraps) { clientBootstrap.doBootstrap(client); } } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala - long bootstrapTime = System.currentTimeMillis() - preBootstrap; - logger.error(Exception while bootstrapping client after + bootstrapTime + ms, e); + long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 100; + logger.error(Exception while bootstrapping client after + bootstrapTimeMs + ms, e); client.close(); throw Throwables.propagate(e); } -long postBootstrap = System.currentTimeMillis(); - -// Successful connection bootstrap -- in the event that two threads raced to create a client, -// use the first one that was put into the connectionPool and close the one we made here. -TransportClient oldClient = connectionPool.putIfAbsent(address, client); -if (oldClient == null) { - logger.debug(Successfully created connection to {} after {} ms ({} ms spent in bootstraps), -address, postBootstrap - preConnect, postBootstrap - preBootstrap); - return client; -} else { - logger.debug(Two clients were created concurrently after {} ms, second will be disposed., -postBootstrap - preConnect); - client.close(); - return oldClient; -} +long postBootstrap = System.nanoTime(); + +logger.debug(Successfully created connection to {} after {} ms ({} ms spent in bootstraps), +address, (postBootstrap - preConnect) / 100, (postBootstrap - preBootstrap) / 100); + +return client; } /** Close all connections in the connection pool, and shutdown the worker thread pool. */ @Override public void close() { -for (TransportClient client : connectionPool.values()) { - try { -client.close(); - } catch (RuntimeException e) { -logger.warn(Ignoring exception during close, e); +// Go through all clients and close them if they are active. +for (ClientPool clientPool : connectionPool.values()) { + for (int i = 0; i clientPool.clients.length; i++) { +TransportClient client = clientPool.clients[i]; +if (client != null) { + clientPool.clients[i] = null; --- End diff -- need synchronization here too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/3625#issuecomment-65869095 [Test build #24198 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24198/consoleFull) for PR 3625 at commit [`9076b4a`](https://github.com/apache/spark/commit/9076b4a0f9e648ae5451795c3d024d491accd6e0). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/3625#issuecomment-65869325 Looks mostly good to me, a few remaining synchronization issues. Will take another long look after you address all comments. I'd really appreciate a test, though, if we can get one in -- we really don't want to be regressing at this point, and we also really want to make sure we're fixing the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org