Github user tgravescs commented on a diff in the pull request: https://github.com/apache/incubator-spark/pull/332#discussion_r9921913 --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala --- @@ -483,10 +496,131 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi /*handleMessage(connection, message)*/ } - private def handleMessage(connectionManagerId: ConnectionManagerId, message: Message) { + private def handleClientAuthNeg( + waitingConn: SendingConnection, + securityMsg: SecurityMessage, + connectionId : ConnectionId) { + if (waitingConn.isSaslComplete()) { + logDebug("Client sasl completed for id: " + waitingConn.connectionId) + connectionsAwaitingSasl -= waitingConn.connectionId + waitingConn.getAuthenticated().synchronized { + waitingConn.getAuthenticated().notifyAll(); + } + return + } else { + var replyToken : Array[Byte] = null + try { + replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken); + if (waitingConn.isSaslComplete()) { + logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId) + connectionsAwaitingSasl -= waitingConn.connectionId + waitingConn.getAuthenticated().synchronized { + waitingConn.getAuthenticated().notifyAll() + } + return + } + var securityMsgResp = SecurityMessage.fromResponse(replyToken, securityMsg.getConnectionId) + var message = securityMsgResp.toBufferMessage + if (message == null) throw new Exception("Error creating security message") + sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message) + } catch { + case e: Exception => { + logError("Error doing sasl client: " + e) + waitingConn.close() + throw new Exception("error evaluating sasl response: " + e) + } + } + } + } + + private def handleServerAuthNeg( + connection: Connection, + securityMsg: SecurityMessage, + connectionId: ConnectionId) { + if (!connection.isSaslComplete()) { + logDebug("saslContext not established") + var replyToken : Array[Byte] = null + try { + connection.synchronized { + if (connection.sparkSaslServer == null) { + logDebug("Creating sasl Server") + connection.sparkSaslServer = new SparkSaslServer(securityManager) + } + } + replyToken = connection.sparkSaslServer.response(securityMsg.getToken) + if (connection.isSaslComplete()) { + logDebug("Server sasl completed: " + connection.connectionId) + } else { + logDebug("Server sasl not completed: " + connection.connectionId) + } + if (replyToken != null) { + var securityMsgResp = SecurityMessage.fromResponse(replyToken, securityMsg.getConnectionId) + var message = securityMsgResp.toBufferMessage + if (message == null) throw new Exception("Error creating security Message") + sendSecurityMessage(connection.getRemoteConnectionManagerId(), message) + } + } catch { + case e: Exception => { + logError("Error in server auth negotiation: " + e) + // It would probably be better to send an error message telling other side auth failed + // but for now just close + connection.close() + } + } + } else { + logDebug("connection already established for this connection id: " + connection.connectionId) + } + } + + + private def handleAuthentication(conn: Connection, bufferMessage: BufferMessage): Boolean = { + if (bufferMessage.isSecurityNeg) { + logDebug("This is security neg message") + + // parse as SecurityMessage + val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage) + val connectionId = new ConnectionId(securityMsg.getConnectionId) + + connectionsAwaitingSasl.get(connectionId) match { + case Some(waitingConn) => { + // Client - this must be in response to us doing Send + logDebug("Client handleAuth for id: " + waitingConn.connectionId) + handleClientAuthNeg(waitingConn, securityMsg, connectionId) + } + case None => { + // Server - someone sent us something and we haven't authenticated yet + logDebug("Server handleAuth for id: " + connectionId) + handleServerAuthNeg(conn, securityMsg, connectionId) + } + } --- End diff -- Not sure I follow. In both cases we are receiving messages on a ReceivingConnection so there is no way to differentiate on the connection it was sent on. The type stored in connectionsAwaitingSasl is only SendingConnections.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. To do so, please top-post your response. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---