Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/incubator-spark/pull/332#discussion_r9921913
--- Diff:
core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
@@ -483,10 +496,131 @@ private[spark] class ConnectionManager(port: Int,
conf: SparkConf) extends Loggi
/*handleMessage(connection, message)*/
}
- private def handleMessage(connectionManagerId: ConnectionManagerId,
message: Message) {
+ private def handleClientAuthNeg(
+ waitingConn: SendingConnection,
+ securityMsg: SecurityMessage,
+ connectionId : ConnectionId) {
+ if (waitingConn.isSaslComplete()) {
+ logDebug("Client sasl completed for id: " +
waitingConn.connectionId)
+ connectionsAwaitingSasl -= waitingConn.connectionId
+ waitingConn.getAuthenticated().synchronized {
+ waitingConn.getAuthenticated().notifyAll();
+ }
+ return
+ } else {
+ var replyToken : Array[Byte] = null
+ try {
+ replyToken =
waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
+ if (waitingConn.isSaslComplete()) {
+ logDebug("Client sasl completed after evaluate for id: " +
waitingConn.connectionId)
+ connectionsAwaitingSasl -= waitingConn.connectionId
+ waitingConn.getAuthenticated().synchronized {
+ waitingConn.getAuthenticated().notifyAll()
+ }
+ return
+ }
+ var securityMsgResp = SecurityMessage.fromResponse(replyToken,
securityMsg.getConnectionId)
+ var message = securityMsgResp.toBufferMessage
+ if (message == null) throw new Exception("Error creating security
message")
+ sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(),
message)
+ } catch {
+ case e: Exception => {
+ logError("Error doing sasl client: " + e)
+ waitingConn.close()
+ throw new Exception("error evaluating sasl response: " + e)
+ }
+ }
+ }
+ }
+
+ private def handleServerAuthNeg(
+ connection: Connection,
+ securityMsg: SecurityMessage,
+ connectionId: ConnectionId) {
+ if (!connection.isSaslComplete()) {
+ logDebug("saslContext not established")
+ var replyToken : Array[Byte] = null
+ try {
+ connection.synchronized {
+ if (connection.sparkSaslServer == null) {
+ logDebug("Creating sasl Server")
+ connection.sparkSaslServer = new
SparkSaslServer(securityManager)
+ }
+ }
+ replyToken =
connection.sparkSaslServer.response(securityMsg.getToken)
+ if (connection.isSaslComplete()) {
+ logDebug("Server sasl completed: " + connection.connectionId)
+ } else {
+ logDebug("Server sasl not completed: " + connection.connectionId)
+ }
+ if (replyToken != null) {
+ var securityMsgResp = SecurityMessage.fromResponse(replyToken,
securityMsg.getConnectionId)
+ var message = securityMsgResp.toBufferMessage
+ if (message == null) throw new Exception("Error creating
security Message")
+ sendSecurityMessage(connection.getRemoteConnectionManagerId(),
message)
+ }
+ } catch {
+ case e: Exception => {
+ logError("Error in server auth negotiation: " + e)
+ // It would probably be better to send an error message telling
other side auth failed
+ // but for now just close
+ connection.close()
+ }
+ }
+ } else {
+ logDebug("connection already established for this connection id: " +
connection.connectionId)
+ }
+ }
+
+
+ private def handleAuthentication(conn: Connection, bufferMessage:
BufferMessage): Boolean = {
+ if (bufferMessage.isSecurityNeg) {
+ logDebug("This is security neg message")
+
+ // parse as SecurityMessage
+ val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage)
+ val connectionId = new ConnectionId(securityMsg.getConnectionId)
+
+ connectionsAwaitingSasl.get(connectionId) match {
+ case Some(waitingConn) => {
+ // Client - this must be in response to us doing Send
+ logDebug("Client handleAuth for id: " +
waitingConn.connectionId)
+ handleClientAuthNeg(waitingConn, securityMsg, connectionId)
+ }
+ case None => {
+ // Server - someone sent us something and we haven't
authenticated yet
+ logDebug("Server handleAuth for id: " + connectionId)
+ handleServerAuthNeg(conn, securityMsg, connectionId)
+ }
+ }
--- End diff --
Not sure I follow. In both cases we are receiving messages on a
ReceivingConnection so there is no way to differentiate on the connection it
was sent on. The type stored in connectionsAwaitingSasl is only
SendingConnections.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
[email protected] or file a JIRA ticket with INFRA.
---