Repository: spark
Updated Branches:
  refs/heads/master 21ddd7d1e -> 383bf72c1


Cleanup on Connection, ConnectionManagerId, ConnectionManager classes part 2

Cleanup on Connection, ConnectionManagerId, and ConnectionManager classes part 
2 while I was working at the code there to help IDE:
1. Remove unused imports
2. Remove parentheses in method calls that do not have side affect.
3. Add parentheses in method calls that do have side effect or not simple get 
to object properties.
4. Change if-else check (via isInstanceOf) for Connection class type with Scala 
expression for consistency and cleanliness.
5. Remove semicolon
6. Remove extra spaces.
7. Remove redundant return for consistency

Author: Henry Saputra <henry.sapu...@gmail.com>

Closes #1157 from hsaputra/cleanup_connection_classes_part2 and squashes the 
following commits:

4be6906 [Henry Saputra] Fix Spark Scala style for line over 100 chars.
85b24f7 [Henry Saputra] Cleanup on Connection and ConnectionManager classes 
part 2 while I was working at the code there to help IDE: 1. Remove unused 
imports 2. Remove parentheses in method calls that do not have side affect. 3. 
Add parentheses in method calls that do have side effect. 4. Change if-else 
check (via isInstanceOf) for Connection class type with Scala expression for 
consitency and cleanliness. 5. Remove semicolon 6. Remove extra spaces.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/383bf72c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/383bf72c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/383bf72c

Branch: refs/heads/master
Commit: 383bf72c115b09d68cbde0d35ed89808ce04863d
Parents: 21ddd7d
Author: Henry Saputra <henry.sapu...@gmail.com>
Authored: Mon Jun 23 17:13:26 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Mon Jun 23 17:13:26 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/network/Connection.scala   |  15 +--
 .../spark/network/ConnectionManager.scala       | 114 +++++++++----------
 .../spark/network/ConnectionManagerId.scala     |   2 +-
 3 files changed, 62 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/383bf72c/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala 
b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 3b6298a..5285ec8 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -17,11 +17,6 @@
 
 package org.apache.spark.network
 
-import org.apache.spark._
-import org.apache.spark.SparkSaslServer
-
-import scala.collection.mutable.{HashMap, Queue, ArrayBuffer}
-
 import java.net._
 import java.nio._
 import java.nio.channels._
@@ -41,7 +36,7 @@ abstract class Connection(val channel: SocketChannel, val 
selector: Selector,
   def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) 
= {
     this(channel_, selector_,
       ConnectionManagerId.fromSocketAddress(
-        
channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]), id_)
+        
channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]), id_)
   }
 
   channel.configureBlocking(false)
@@ -89,7 +84,7 @@ abstract class Connection(val channel: SocketChannel, val 
selector: Selector,
 
   private def disposeSasl() {
     if (sparkSaslServer != null) {
-      sparkSaslServer.dispose();
+      sparkSaslServer.dispose()
     }
 
     if (sparkSaslClient != null) {
@@ -328,15 +323,13 @@ class SendingConnection(val address: InetSocketAddress, 
selector_ : Selector,
       // Is highly unlikely unless there was an unclean close of socket, etc
       registerInterest()
       logInfo("Connected to [" + address + "], " + outbox.messages.size + " 
messages pending")
-      true
     } catch {
       case e: Exception => {
         logWarning("Error finishing connection to " + address, e)
         callOnExceptionCallback(e)
-        // ignore
-        return true
       }
     }
+    true
   }
 
   override def write(): Boolean = {
@@ -546,7 +539,7 @@ private[spark] class ReceivingConnection(
           /* println("Filled buffer at " + System.currentTimeMillis) */
           val bufferMessage = inbox.getMessageForChunk(currentChunk).get
           if (bufferMessage.isCompletelyReceived) {
-            bufferMessage.flip
+            bufferMessage.flip()
             bufferMessage.finishTime = System.currentTimeMillis
             logDebug("Finished receiving [" + bufferMessage + "] from " +
               "[" + getRemoteConnectionManagerId() + "] in " + 
bufferMessage.timeTaken)

http://git-wip-us.apache.org/repos/asf/spark/blob/383bf72c/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index cf1c985..8a1cdb8 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
   def run() {
     try {
       while(!selectorThread.isInterrupted) {
-        while (! registerRequests.isEmpty) {
+        while (!registerRequests.isEmpty) {
           val conn: SendingConnection = registerRequests.dequeue()
           addListeners(conn)
           conn.connect()
@@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
               // Some keys within the selectors list are invalid/closed. clear 
them.
               val allKeys = selector.keys().iterator()
 
-              while (allKeys.hasNext()) {
+              while (allKeys.hasNext) {
                 val key = allKeys.next()
                 try {
                   if (! key.isValid) {
@@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
 
         if (0 != selectedKeysCount) {
           val selectedKeys = selector.selectedKeys().iterator()
-          while (selectedKeys.hasNext()) {
+          while (selectedKeys.hasNext) {
             val key = selectedKeys.next
             selectedKeys.remove()
             try {
@@ -419,62 +419,63 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
     connectionsByKey -= connection.key
 
     try {
-      if (connection.isInstanceOf[SendingConnection]) {
-        val sendingConnection = connection.asInstanceOf[SendingConnection]
-        val sendingConnectionManagerId = 
sendingConnection.getRemoteConnectionManagerId()
-        logInfo("Removing SendingConnection to " + sendingConnectionManagerId)
-
-        connectionsById -= sendingConnectionManagerId
-        connectionsAwaitingSasl -= connection.connectionId
+      connection match {
+        case sendingConnection: SendingConnection =>
+          val sendingConnectionManagerId = 
sendingConnection.getRemoteConnectionManagerId()
+          logInfo("Removing SendingConnection to " + 
sendingConnectionManagerId)
+
+          connectionsById -= sendingConnectionManagerId
+          connectionsAwaitingSasl -= connection.connectionId
+
+          messageStatuses.synchronized {
+            messageStatuses.values.filter(_.connectionManagerId == 
sendingConnectionManagerId)
+              .foreach(status => {
+                logInfo("Notifying " + status)
+                status.synchronized {
+                  status.attempted = true
+                  status.acked = false
+                  status.markDone()
+                }
+              })
 
-        messageStatuses.synchronized {
-          messageStatuses
-            .values.filter(_.connectionManagerId == 
sendingConnectionManagerId).foreach(status => {
-              logInfo("Notifying " + status)
-              status.synchronized {
-              status.attempted = true
-               status.acked = false
-               status.markDone()
-              }
+            messageStatuses.retain((i, status) => {
+              status.connectionManagerId != sendingConnectionManagerId
             })
+          }
+        case receivingConnection: ReceivingConnection =>
+          val remoteConnectionManagerId = 
receivingConnection.getRemoteConnectionManagerId()
+          logInfo("Removing ReceivingConnection to " + 
remoteConnectionManagerId)
 
-          messageStatuses.retain((i, status) => {
-            status.connectionManagerId != sendingConnectionManagerId
-          })
-        }
-      } else if (connection.isInstanceOf[ReceivingConnection]) {
-        val receivingConnection = connection.asInstanceOf[ReceivingConnection]
-        val remoteConnectionManagerId = 
receivingConnection.getRemoteConnectionManagerId()
-        logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId)
-
-        val sendingConnectionOpt = 
connectionsById.get(remoteConnectionManagerId)
-          if (! sendingConnectionOpt.isDefined) {
-          logError("Corresponding SendingConnectionManagerId not found")
-          return
-        }
+          val sendingConnectionOpt = 
connectionsById.get(remoteConnectionManagerId)
+          if (!sendingConnectionOpt.isDefined) {
+            logError("Corresponding SendingConnectionManagerId not found")
+            return
+          }
 
-        val sendingConnection = sendingConnectionOpt.get
-        connectionsById -= remoteConnectionManagerId
-        sendingConnection.close()
+          val sendingConnection = sendingConnectionOpt.get
+          connectionsById -= remoteConnectionManagerId
+          sendingConnection.close()
 
-        val sendingConnectionManagerId = 
sendingConnection.getRemoteConnectionManagerId()
+          val sendingConnectionManagerId = 
sendingConnection.getRemoteConnectionManagerId()
 
-        assert (sendingConnectionManagerId == remoteConnectionManagerId)
+          assert(sendingConnectionManagerId == remoteConnectionManagerId)
 
-        messageStatuses.synchronized {
-          for (s <- messageStatuses.values if s.connectionManagerId == 
sendingConnectionManagerId) {
-            logInfo("Notifying " + s)
-            s.synchronized {
-              s.attempted = true
-              s.acked = false
-              s.markDone()
+          messageStatuses.synchronized {
+            for (s <- messageStatuses.values
+                 if s.connectionManagerId == sendingConnectionManagerId) {
+              logInfo("Notifying " + s)
+              s.synchronized {
+                s.attempted = true
+                s.acked = false
+                s.markDone()
+              }
             }
-          }
 
-          messageStatuses.retain((i, status) => {
-            status.connectionManagerId != sendingConnectionManagerId
-          })
-        }
+            messageStatuses.retain((i, status) => {
+              status.connectionManagerId != sendingConnectionManagerId
+            })
+          }
+        case _ => logError("Unsupported type of connection.")
       }
     } finally {
       // So that the selection keys can be removed.
@@ -517,13 +518,13 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
       logDebug("Client sasl completed for id: "  + waitingConn.connectionId)
       connectionsAwaitingSasl -= waitingConn.connectionId
       waitingConn.getAuthenticated().synchronized {
-        waitingConn.getAuthenticated().notifyAll();
+        waitingConn.getAuthenticated().notifyAll()
       }
       return
     } else {
       var replyToken : Array[Byte] = null
       try {
-        replyToken = 
waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
+        replyToken = 
waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken)
         if (waitingConn.isSaslComplete()) {
           logDebug("Client sasl completed after evaluate for id: " + 
waitingConn.connectionId)
           connectionsAwaitingSasl -= waitingConn.connectionId
@@ -533,7 +534,7 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
           return
         }
         val securityMsgResp = SecurityMessage.fromResponse(replyToken,
-          securityMsg.getConnectionId.toString())
+          securityMsg.getConnectionId.toString)
         val message = securityMsgResp.toBufferMessage
         if (message == null) throw new Exception("Error creating security 
message")
         sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), 
message)
@@ -630,13 +631,13 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
       case bufferMessage: BufferMessage => {
         if (authEnabled) {
           val res = handleAuthentication(connection, bufferMessage)
-          if (res == true) {
+          if (res) {
             // message was security negotiation so skip the rest
             logDebug("After handleAuth result was true, returning")
             return
           }
         }
-        if (bufferMessage.hasAckId) {
+        if (bufferMessage.hasAckId()) {
           val sentMessageStatus = messageStatuses.synchronized {
             messageStatuses.get(bufferMessage.ackId) match {
               case Some(status) => {
@@ -646,7 +647,6 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
               case None => {
                 throw new Exception("Could not find reference for received ack 
message " +
                   message.id)
-                null
               }
             }
           }
@@ -668,7 +668,7 @@ private[spark] class ConnectionManager(port: Int, conf: 
SparkConf,
           if (ackMessage.isDefined) {
             if (!ackMessage.get.isInstanceOf[BufferMessage]) {
               logDebug("Response to " + bufferMessage + " is not a buffer 
message, it is of type "
-                + ackMessage.get.getClass())
+                + ackMessage.get.getClass)
             } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
               logDebug("Response to " + bufferMessage + " does not have ack id 
set")
               ackMessage.get.asInstanceOf[BufferMessage].ackId = 
bufferMessage.id

http://git-wip-us.apache.org/repos/asf/spark/blob/383bf72c/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
index b82edb6..57f7586 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
@@ -32,6 +32,6 @@ private[spark] case class ConnectionManagerId(host: String, 
port: Int) {
 
 private[spark] object ConnectionManagerId {
   def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId 
= {
-    new ConnectionManagerId(socketAddress.getHostName(), 
socketAddress.getPort())
+    new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort)
   }
 }

Reply via email to