Repository: spark Updated Branches: refs/heads/master 21ddd7d1e -> 383bf72c1
Cleanup on Connection, ConnectionManagerId, ConnectionManager classes part 2 Cleanup on Connection, ConnectionManagerId, and ConnectionManager classes part 2 while I was working at the code there to help IDE: 1. Remove unused imports 2. Remove parentheses in method calls that do not have side affect. 3. Add parentheses in method calls that do have side effect or not simple get to object properties. 4. Change if-else check (via isInstanceOf) for Connection class type with Scala expression for consistency and cleanliness. 5. Remove semicolon 6. Remove extra spaces. 7. Remove redundant return for consistency Author: Henry Saputra <henry.sapu...@gmail.com> Closes #1157 from hsaputra/cleanup_connection_classes_part2 and squashes the following commits: 4be6906 [Henry Saputra] Fix Spark Scala style for line over 100 chars. 85b24f7 [Henry Saputra] Cleanup on Connection and ConnectionManager classes part 2 while I was working at the code there to help IDE: 1. Remove unused imports 2. Remove parentheses in method calls that do not have side affect. 3. Add parentheses in method calls that do have side effect. 4. Change if-else check (via isInstanceOf) for Connection class type with Scala expression for consitency and cleanliness. 5. Remove semicolon 6. Remove extra spaces. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/383bf72c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/383bf72c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/383bf72c Branch: refs/heads/master Commit: 383bf72c115b09d68cbde0d35ed89808ce04863d Parents: 21ddd7d Author: Henry Saputra <henry.sapu...@gmail.com> Authored: Mon Jun 23 17:13:26 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Mon Jun 23 17:13:26 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/network/Connection.scala | 15 +-- .../spark/network/ConnectionManager.scala | 114 +++++++++---------- .../spark/network/ConnectionManagerId.scala | 2 +- 3 files changed, 62 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/383bf72c/core/src/main/scala/org/apache/spark/network/Connection.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 3b6298a..5285ec8 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -17,11 +17,6 @@ package org.apache.spark.network -import org.apache.spark._ -import org.apache.spark.SparkSaslServer - -import scala.collection.mutable.{HashMap, Queue, ArrayBuffer} - import java.net._ import java.nio._ import java.nio.channels._ @@ -41,7 +36,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = { this(channel_, selector_, ConnectionManagerId.fromSocketAddress( - channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]), id_) + channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]), id_) } channel.configureBlocking(false) @@ -89,7 +84,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, private def disposeSasl() { if (sparkSaslServer != null) { - sparkSaslServer.dispose(); + sparkSaslServer.dispose() } if (sparkSaslClient != null) { @@ -328,15 +323,13 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // Is highly unlikely unless there was an unclean close of socket, etc registerInterest() logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") - true } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) callOnExceptionCallback(e) - // ignore - return true } } + true } override def write(): Boolean = { @@ -546,7 +539,7 @@ private[spark] class ReceivingConnection( /* println("Filled buffer at " + System.currentTimeMillis) */ val bufferMessage = inbox.getMessageForChunk(currentChunk).get if (bufferMessage.isCompletelyReceived) { - bufferMessage.flip + bufferMessage.flip() bufferMessage.finishTime = System.currentTimeMillis logDebug("Finished receiving [" + bufferMessage + "] from " + "[" + getRemoteConnectionManagerId() + "] in " + bufferMessage.timeTaken) http://git-wip-us.apache.org/repos/asf/spark/blob/383bf72c/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index cf1c985..8a1cdb8 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -249,7 +249,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, def run() { try { while(!selectorThread.isInterrupted) { - while (! registerRequests.isEmpty) { + while (!registerRequests.isEmpty) { val conn: SendingConnection = registerRequests.dequeue() addListeners(conn) conn.connect() @@ -308,7 +308,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, // Some keys within the selectors list are invalid/closed. clear them. val allKeys = selector.keys().iterator() - while (allKeys.hasNext()) { + while (allKeys.hasNext) { val key = allKeys.next() try { if (! key.isValid) { @@ -341,7 +341,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (0 != selectedKeysCount) { val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext()) { + while (selectedKeys.hasNext) { val key = selectedKeys.next selectedKeys.remove() try { @@ -419,62 +419,63 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, connectionsByKey -= connection.key try { - if (connection.isInstanceOf[SendingConnection]) { - val sendingConnection = connection.asInstanceOf[SendingConnection] - val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() - logInfo("Removing SendingConnection to " + sendingConnectionManagerId) - - connectionsById -= sendingConnectionManagerId - connectionsAwaitingSasl -= connection.connectionId + connection match { + case sendingConnection: SendingConnection => + val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() + logInfo("Removing SendingConnection to " + sendingConnectionManagerId) + + connectionsById -= sendingConnectionManagerId + connectionsAwaitingSasl -= connection.connectionId + + messageStatuses.synchronized { + messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId) + .foreach(status => { + logInfo("Notifying " + status) + status.synchronized { + status.attempted = true + status.acked = false + status.markDone() + } + }) - messageStatuses.synchronized { - messageStatuses - .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { - logInfo("Notifying " + status) - status.synchronized { - status.attempted = true - status.acked = false - status.markDone() - } + messageStatuses.retain((i, status) => { + status.connectionManagerId != sendingConnectionManagerId }) + } + case receivingConnection: ReceivingConnection => + val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId() + logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId) - messageStatuses.retain((i, status) => { - status.connectionManagerId != sendingConnectionManagerId - }) - } - } else if (connection.isInstanceOf[ReceivingConnection]) { - val receivingConnection = connection.asInstanceOf[ReceivingConnection] - val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId() - logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId) - - val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId) - if (! sendingConnectionOpt.isDefined) { - logError("Corresponding SendingConnectionManagerId not found") - return - } + val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId) + if (!sendingConnectionOpt.isDefined) { + logError("Corresponding SendingConnectionManagerId not found") + return + } - val sendingConnection = sendingConnectionOpt.get - connectionsById -= remoteConnectionManagerId - sendingConnection.close() + val sendingConnection = sendingConnectionOpt.get + connectionsById -= remoteConnectionManagerId + sendingConnection.close() - val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() + val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId() - assert (sendingConnectionManagerId == remoteConnectionManagerId) + assert(sendingConnectionManagerId == remoteConnectionManagerId) - messageStatuses.synchronized { - for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { - logInfo("Notifying " + s) - s.synchronized { - s.attempted = true - s.acked = false - s.markDone() + messageStatuses.synchronized { + for (s <- messageStatuses.values + if s.connectionManagerId == sendingConnectionManagerId) { + logInfo("Notifying " + s) + s.synchronized { + s.attempted = true + s.acked = false + s.markDone() + } } - } - messageStatuses.retain((i, status) => { - status.connectionManagerId != sendingConnectionManagerId - }) - } + messageStatuses.retain((i, status) => { + status.connectionManagerId != sendingConnectionManagerId + }) + } + case _ => logError("Unsupported type of connection.") } } finally { // So that the selection keys can be removed. @@ -517,13 +518,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, logDebug("Client sasl completed for id: " + waitingConn.connectionId) connectionsAwaitingSasl -= waitingConn.connectionId waitingConn.getAuthenticated().synchronized { - waitingConn.getAuthenticated().notifyAll(); + waitingConn.getAuthenticated().notifyAll() } return } else { var replyToken : Array[Byte] = null try { - replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken); + replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken) if (waitingConn.isSaslComplete()) { logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId) connectionsAwaitingSasl -= waitingConn.connectionId @@ -533,7 +534,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, return } val securityMsgResp = SecurityMessage.fromResponse(replyToken, - securityMsg.getConnectionId.toString()) + securityMsg.getConnectionId.toString) val message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security message") sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message) @@ -630,13 +631,13 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, case bufferMessage: BufferMessage => { if (authEnabled) { val res = handleAuthentication(connection, bufferMessage) - if (res == true) { + if (res) { // message was security negotiation so skip the rest logDebug("After handleAuth result was true, returning") return } } - if (bufferMessage.hasAckId) { + if (bufferMessage.hasAckId()) { val sentMessageStatus = messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) => { @@ -646,7 +647,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, case None => { throw new Exception("Could not find reference for received ack message " + message.id) - null } } } @@ -668,7 +668,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, if (ackMessage.isDefined) { if (!ackMessage.get.isInstanceOf[BufferMessage]) { logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " - + ackMessage.get.getClass()) + + ackMessage.get.getClass) } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) { logDebug("Response to " + bufferMessage + " does not have ack id set") ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id http://git-wip-us.apache.org/repos/asf/spark/blob/383bf72c/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala index b82edb6..57f7586 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala @@ -32,6 +32,6 @@ private[spark] case class ConnectionManagerId(host: String, port: Int) { private[spark] object ConnectionManagerId { def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = { - new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort()) + new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort) } }