Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r45512856
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
 ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.ExceptionEvent;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslServerHandler extends 
SimpleChannelUpstreamHandler {
    +
    +    ISaslServer server;
    +    /** Used for client or server's token to send or receive from each 
other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +    private List<String> authorizedUsers;
    +
    +    private static final Logger LOG = LoggerFactory
    +        .getLogger(KerberosSaslServerHandler.class);
    +
    +    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, 
String jaas_section, List<String> authorizedUsers) throws IOException {
    +        this.server = server;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        this.authorizedUsers = authorizedUsers;
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    +        throws Exception {
    +        Object msg = e.getMessage();
    +        if (msg == null)
    +            return;
    +
    +        Channel channel = ctx.getChannel();
    +
    +
    +        if (msg instanceof SaslMessageToken) {
    +            // initialize server-side SASL functionality, if we haven't yet
    +            // (in which case we are looking at the first SASL message 
from the
    +            // client).
    +            try {
    +                LOG.debug("Got SaslMessageToken!");
    +
    +                KerberosSaslNettyServer saslNettyServer = 
KerberosSaslNettyServerState.getKerberosSaslNettyServer
    +                    .get(channel);
    +                if (saslNettyServer == null) {
    +                    LOG.debug("No saslNettyServer for {}  yet; creating 
now, with topology token: ", channel);
    +                    try {
    +                        saslNettyServer = new 
KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
    +                    } catch (RuntimeException ioe) {
    +                        LOG.error("Error occurred while creating 
saslNettyServer on server {} for client {}",
    +                                  channel.getLocalAddress(), 
channel.getRemoteAddress());
    +                        saslNettyServer = null;
    +                    }
    +
    +                    
KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
    +                                                                           
     saslNettyServer);
    +                } else {
    +                    LOG.debug("Found existing saslNettyServer on server: 
{} for client {}",
    +                              channel.getLocalAddress(), 
channel.getRemoteAddress());
    +                }
    +
    +                byte[] responseBytes = 
saslNettyServer.response(((SaslMessageToken) msg)
    +                                                                
.getSaslToken());
    +
    +                SaslMessageToken saslTokenMessageRequest = new 
SaslMessageToken(responseBytes);
    +
    +                if(saslTokenMessageRequest.getSaslToken() == null) {
    +                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
    +                } else {
    +                    // Send response to client.
    +                    channel.write(saslTokenMessageRequest);
    +                }
    +
    +                if (saslNettyServer.isComplete()) {
    +                    // If authentication of client is complete, we will 
also send a
    +                    // SASL-Complete message to the client.
    +                    LOG.info("SASL authentication is complete for client 
with username: {}",
    +                             saslNettyServer.getUserName());
    +                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
    +                    LOG.debug("Removing SaslServerHandler from pipeline 
since SASL authentication is complete.");
    +                    ctx.getPipeline().remove(this);
    +                    server.authenticated(channel);
    +                }
    +                return;
    +            }
    +            catch (Exception ex) {
    +                LOG.error("Failed to handle SaslMessageToken: ", ex);
    +                throw ex;
    +            }
    +        } else {
    +            // Client should not be sending other-than-SASL messages before
    +            // SaslServerHandler has removed itself from the pipeline. Such
    +            // non-SASL requests will be denied by the Authorize channel 
handler
    +            // (the next handler upstream in the server pipeline) if SASL
    +            // authentication has not completed.
    +            LOG.warn("Sending upstream an unexpected non-SASL message : 
{}",
    +                     msg);
    +            Channels.fireMessageReceived(ctx, msg);
    +        }
    +    }
    +
    +    @Override
    +    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent 
e) {
    +        if(server != null) server.closeChannel(e.getChannel());
    --- End diff --
    
    minor: braces on `if`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to