[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread rxin
GitHub user rxin opened a pull request:

https://github.com/apache/spark/pull/3625

[SPARK-4740] [WIP] Create multiple concurrent connections between two peer 
nodes in Netty.

Need to test  add test cases.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rxin/spark SPARK-4740

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3625.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3625


commit 4f216736024f48ed9fa3fca6ea5953495def8e51
Author: Reynold Xin r...@databricks.com
Date:   2014-12-05T21:20:58Z

[SPARK-4740] Create multiple concurrent connections between two peer nodes 
in Netty.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3625#issuecomment-65857520
  
  [Test build #24193 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24193/consoleFull)
 for   PR 3625 at commit 
[`3e1306c`](https://github.com/apache/spark/commit/3e1306cd443fe3f580e44bf947b00223f0beb747).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3625#issuecomment-65865836
  
  [Test build #24193 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24193/consoleFull)
 for   PR 3625 at commit 
[`3e1306c`](https://github.com/apache/spark/commit/3e1306cd443fe3f580e44bf947b00223f0beb747).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3625#issuecomment-65865847
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24193/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/3625#discussion_r21406023
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -56,12 +57,28 @@
  * TransportClient, all given {@link TransportClientBootstrap}s will be 
run.
  */
 public class TransportClientFactory implements Closeable {
+
+  /** A simple data structure to track the pool of clients between two 
peer nodes. */
+  private class ClientPool {
+TransportClient[] clients;
+Object[] locks;
+
+public ClientPool() {
+  clients = new TransportClient[numConnectionsPerPeer];
--- End diff --

We can make this a private static class if we make this a constructor 
parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/3625#discussion_r21406649
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -97,23 +116,45 @@ public TransportClient createClient(String remoteHost, 
int remotePort) throws IO
 // Get connection from the connection pool first.
 // If it is not found or not active, create a new one.
 final InetSocketAddress address = new InetSocketAddress(remoteHost, 
remotePort);
-TransportClient cachedClient = connectionPool.get(address);
+
+// Create the ClientPool if we don't have it yet.
+ClientPool clientPool = connectionPool.get(address);
+if (clientPool == null) {
+  clientPool = connectionPool.putIfAbsent(address, new ClientPool());
--- End diff --

putIfAbsent returns the *previous* value, so this would be null


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/3625#discussion_r21406760
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -97,23 +116,45 @@ public TransportClient createClient(String remoteHost, 
int remotePort) throws IO
 // Get connection from the connection pool first.
 // If it is not found or not active, create a new one.
 final InetSocketAddress address = new InetSocketAddress(remoteHost, 
remotePort);
-TransportClient cachedClient = connectionPool.get(address);
+
+// Create the ClientPool if we don't have it yet.
+ClientPool clientPool = connectionPool.get(address);
+if (clientPool == null) {
+  clientPool = connectionPool.putIfAbsent(address, new ClientPool());
+}
+
+int clientIndex = rand.nextInt(numConnectionsPerPeer);
+TransportClient cachedClient = clientPool.clients[clientIndex];
 if (cachedClient != null) {
   if (cachedClient.isActive()) {
 logger.trace(Returning cached connection to {}: {}, address, 
cachedClient);
 return cachedClient;
   } else {
 logger.info(Found inactive connection to {}, closing it., 
address);
-connectionPool.remove(address, cachedClient); // Remove inactive 
clients.
+clientPool.clients[clientIndex] = null;  // Remove inactive 
clients.
--- End diff --

Shouldn't this be behind a lock?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/3625#discussion_r21406852
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -143,43 +184,41 @@ public void initChannel(SocketChannel ch) {
 assert client != null : Channel future completed successfully with 
null client;
 
 // Execute any client bootstraps synchronously before marking the 
Client as successful.
-long preBootstrap = System.currentTimeMillis();
+long preBootstrap = System.nanoTime();
 logger.debug(Connection to {} successful, running bootstraps..., 
address);
 try {
   for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
 clientBootstrap.doBootstrap(client);
   }
 } catch (Exception e) { // catch non-RuntimeExceptions too as 
bootstrap may be written in Scala
-  long bootstrapTime = System.currentTimeMillis() - preBootstrap;
-  logger.error(Exception while bootstrapping client after  + 
bootstrapTime +  ms, e);
+  long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 100;
+  logger.error(Exception while bootstrapping client after  + 
bootstrapTimeMs +  ms, e);
   client.close();
   throw Throwables.propagate(e);
 }
-long postBootstrap = System.currentTimeMillis();
-
-// Successful connection  bootstrap -- in the event that two threads 
raced to create a client,
-// use the first one that was put into the connectionPool and close 
the one we made here.
-TransportClient oldClient = connectionPool.putIfAbsent(address, 
client);
-if (oldClient == null) {
-  logger.debug(Successfully created connection to {} after {} ms ({} 
ms spent in bootstraps),
-address, postBootstrap - preConnect, postBootstrap - preBootstrap);
-  return client;
-} else {
-  logger.debug(Two clients were created concurrently after {} ms, 
second will be disposed.,
-postBootstrap - preConnect);
-  client.close();
-  return oldClient;
-}
+long postBootstrap = System.nanoTime();
+
+logger.debug(Successfully created connection to {} after {} ms ({} ms 
spent in bootstraps),
+address, (postBootstrap - preConnect) / 100, (postBootstrap - 
preBootstrap) / 100);
--- End diff --

2 spaces for indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/3625#discussion_r21406868
  
--- Diff: 
network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -143,43 +184,41 @@ public void initChannel(SocketChannel ch) {
 assert client != null : Channel future completed successfully with 
null client;
 
 // Execute any client bootstraps synchronously before marking the 
Client as successful.
-long preBootstrap = System.currentTimeMillis();
+long preBootstrap = System.nanoTime();
 logger.debug(Connection to {} successful, running bootstraps..., 
address);
 try {
   for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
 clientBootstrap.doBootstrap(client);
   }
 } catch (Exception e) { // catch non-RuntimeExceptions too as 
bootstrap may be written in Scala
-  long bootstrapTime = System.currentTimeMillis() - preBootstrap;
-  logger.error(Exception while bootstrapping client after  + 
bootstrapTime +  ms, e);
+  long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 100;
+  logger.error(Exception while bootstrapping client after  + 
bootstrapTimeMs +  ms, e);
   client.close();
   throw Throwables.propagate(e);
 }
-long postBootstrap = System.currentTimeMillis();
-
-// Successful connection  bootstrap -- in the event that two threads 
raced to create a client,
-// use the first one that was put into the connectionPool and close 
the one we made here.
-TransportClient oldClient = connectionPool.putIfAbsent(address, 
client);
-if (oldClient == null) {
-  logger.debug(Successfully created connection to {} after {} ms ({} 
ms spent in bootstraps),
-address, postBootstrap - preConnect, postBootstrap - preBootstrap);
-  return client;
-} else {
-  logger.debug(Two clients were created concurrently after {} ms, 
second will be disposed.,
-postBootstrap - preConnect);
-  client.close();
-  return oldClient;
-}
+long postBootstrap = System.nanoTime();
+
+logger.debug(Successfully created connection to {} after {} ms ({} ms 
spent in bootstraps),
+address, (postBootstrap - preConnect) / 100, (postBootstrap - 
preBootstrap) / 100);
+
+return client;
   }
 
   /** Close all connections in the connection pool, and shutdown the 
worker thread pool. */
   @Override
   public void close() {
-for (TransportClient client : connectionPool.values()) {
-  try {
-client.close();
-  } catch (RuntimeException e) {
-logger.warn(Ignoring exception during close, e);
+// Go through all clients and close them if they are active.
+for (ClientPool clientPool : connectionPool.values()) {
+  for (int i = 0; i  clientPool.clients.length; i++) {
+TransportClient client = clientPool.clients[i];
+if (client != null) {
+  clientPool.clients[i] = null;
--- End diff --

need synchronization here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3625#issuecomment-65869095
  
  [Test build #24198 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24198/consoleFull)
 for   PR 3625 at commit 
[`9076b4a`](https://github.com/apache/spark/commit/9076b4a0f9e648ae5451795c3d024d491accd6e0).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4740] [WIP] Create multiple concurrent ...

2014-12-05 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/3625#issuecomment-65869325
  
Looks mostly good to me, a few remaining synchronization issues. Will take 
another long look after you address all comments. I'd really appreciate a test, 
though, if we can get one in -- we really don't want to be regressing at this 
point, and we also really want to make sure we're fixing the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org