Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/332#discussion_r9921913
  
    --- Diff: 
core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -483,10 +496,131 @@ private[spark] class ConnectionManager(port: Int, 
conf: SparkConf) extends Loggi
         /*handleMessage(connection, message)*/
       }
     
    -  private def handleMessage(connectionManagerId: ConnectionManagerId, 
message: Message) {
    +  private def handleClientAuthNeg(
    +      waitingConn: SendingConnection,
    +      securityMsg: SecurityMessage, 
    +      connectionId : ConnectionId) {
    +    if (waitingConn.isSaslComplete()) {
    +      logDebug("Client sasl completed for id: "  + 
waitingConn.connectionId)
    +      connectionsAwaitingSasl -= waitingConn.connectionId
    +      waitingConn.getAuthenticated().synchronized {
    +        waitingConn.getAuthenticated().notifyAll();
    +      }
    +      return
    +    } else {
    +      var replyToken : Array[Byte] = null
    +      try {
    +        replyToken = 
waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
    +        if (waitingConn.isSaslComplete()) {
    +          logDebug("Client sasl completed after evaluate for id: " + 
waitingConn.connectionId)
    +          connectionsAwaitingSasl -= waitingConn.connectionId
    +          waitingConn.getAuthenticated().synchronized {
    +            waitingConn.getAuthenticated().notifyAll()
    +          }
    +          return
    +        }
    +        var securityMsgResp = SecurityMessage.fromResponse(replyToken, 
securityMsg.getConnectionId)
    +        var message = securityMsgResp.toBufferMessage
    +        if (message == null) throw new Exception("Error creating security 
message")
    +        sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), 
message)
    +      } catch  {
    +        case e: Exception => {
    +          logError("Error doing sasl client: " + e)
    +          waitingConn.close()
    +          throw new Exception("error evaluating sasl response: " + e)
    +        }
    +      }
    +    }
    +  }
    +
    +  private def handleServerAuthNeg(
    +      connection: Connection, 
    +      securityMsg: SecurityMessage,
    +      connectionId: ConnectionId) {
    +    if (!connection.isSaslComplete()) {
    +      logDebug("saslContext not established")
    +      var replyToken : Array[Byte] = null
    +      try {
    +        connection.synchronized {
    +          if (connection.sparkSaslServer == null) {
    +            logDebug("Creating sasl Server")
    +            connection.sparkSaslServer = new 
SparkSaslServer(securityManager)
    +          }
    +        }
    +        replyToken = 
connection.sparkSaslServer.response(securityMsg.getToken)
    +        if (connection.isSaslComplete()) {
    +          logDebug("Server sasl completed: " + connection.connectionId) 
    +        } else {
    +          logDebug("Server sasl not completed: " + connection.connectionId)
    +        }
    +        if (replyToken != null) {
    +          var securityMsgResp = SecurityMessage.fromResponse(replyToken, 
securityMsg.getConnectionId)
    +          var message = securityMsgResp.toBufferMessage
    +          if (message == null) throw new Exception("Error creating 
security Message")
    +          sendSecurityMessage(connection.getRemoteConnectionManagerId(), 
message)
    +        } 
    +      } catch {
    +        case e: Exception => {
    +          logError("Error in server auth negotiation: " + e)
    +          // It would probably be better to send an error message telling 
other side auth failed
    +          // but for now just close
    +          connection.close()
    +        }
    +      }
    +    } else {
    +      logDebug("connection already established for this connection id: " + 
connection.connectionId) 
    +    }
    +  }
    +
    +
    +  private def handleAuthentication(conn: Connection, bufferMessage: 
BufferMessage): Boolean = {
    +    if (bufferMessage.isSecurityNeg) {
    +      logDebug("This is security neg message")
    +
    +      // parse as SecurityMessage
    +      val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage)
    +      val connectionId = new ConnectionId(securityMsg.getConnectionId)
    +
    +      connectionsAwaitingSasl.get(connectionId) match {
    +        case Some(waitingConn) => {
    +          // Client - this must be in response to us doing Send
    +          logDebug("Client handleAuth for id: " +  
waitingConn.connectionId)
    +          handleClientAuthNeg(waitingConn, securityMsg, connectionId)
    +        }
    +        case None => {
    +          // Server - someone sent us something and we haven't 
authenticated yet
    +          logDebug("Server handleAuth for id: " + connectionId)
    +          handleServerAuthNeg(conn, securityMsg, connectionId)
    +        }
    +      }
    --- End diff --
    
    Not sure I follow. In both cases we are receiving messages on a 
ReceivingConnection so there is no way to differentiate on the connection it 
was sent on.  The type stored in connectionsAwaitingSasl is only 
SendingConnections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastruct...@apache.org or file a JIRA ticket with INFRA.
---

Reply via email to