Added: giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java?rev=1396722&view=auto ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java (added) +++ giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SaslTokenMessageRequest.java Wed Oct 10 18:35:31 2012 @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import org.apache.giraph.comm.netty.SaslNettyServer; +import org.apache.log4j.Logger; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Send and receive SASL tokens. + */ +public class SaslTokenMessageRequest extends WritableRequest { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(SaslTokenMessageRequest.class); + + /** Used for client or server's token to send or receive from each other. */ + private byte[] token; + + /** + * Constructor used for reflection only. + */ + public SaslTokenMessageRequest() { } + + /** + * Constructor used to send request. + * + * @param token the SASL token, generated by a SaslClient or SaslServer. + */ + public SaslTokenMessageRequest(byte[] token) { + this.token = token; + } + + /** + * Read accessor for SASL token + * + * @return saslToken SASL token + */ + public byte[] getSaslToken() { + return token; + } + + /** + * Write accessor for SASL token + * + * @param token SASL token + */ + public void setSaslToken(byte[] token) { + this.token = token; + } + + @Override + public RequestType getType() { + return RequestType.SASL_TOKEN_MESSAGE_REQUEST; + } + + @Override + public void readFieldsRequest(DataInput input) throws IOException { + int tokenSize = input.readInt(); + token = new byte[tokenSize]; + input.readFully(token); + } + + /** + * Update server's token in response to the SASL token received from + * client. Updated token is sent to client by + * SaslServerHandler.messageReceived(). + * + * @param saslNettyServer used to create response. + */ + + public void processToken(SaslNettyServer saslNettyServer) { + if (LOG.isDebugEnabled()) { + LOG.debug("processToken: With nettyServer: " + saslNettyServer + + " and token length: " + token.length); + } + token = saslNettyServer.response(token); + if (LOG.isDebugEnabled()) { + LOG.debug("processToken: Response token's length is:" + token.length); + } + } + + @Override + public void writeRequest(DataOutput output) throws IOException { + output.writeInt(token.length); + output.write(token); + } +}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1396722&r1=1396721&r2=1396722&view=diff ============================================================================== --- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct 10 18:35:31 2012 @@ -594,7 +594,11 @@ else[HADOOP_NON_SECURE]*/ workerGraphPartitioner.updatePartitionOwners( getWorkerInfo(), masterSetPartitionOwners, getPartitionStore()); +/*if[HADOOP_NON_SECURE] commService.setup(); +else[HADOOP_NON_SECURE]*/ + commService.setup(getConfiguration().authenticate()); +/*end[HADOOP_NON_SECURE]*/ // Ensure the InputSplits are ready for processing before processing while (true) { @@ -1037,11 +1041,11 @@ else[HADOOP_NON_SECURE]*/ } } catch (KeeperException e) { // Cleaning up, it's okay to fail after cleanup is successful - LOG.error("cleanup: Got KeeperException on notifcation " + + LOG.error("cleanup: Got KeeperException on notification " + "to master about cleanup", e); } catch (InterruptedException e) { // Cleaning up, it's okay to fail after cleanup is successful - LOG.error("cleanup: Got InterruptedException on notifcation " + + LOG.error("cleanup: Got InterruptedException on notification " + "to master about cleanup", e); } try { @@ -1239,7 +1243,7 @@ else[HADOOP_NON_SECURE]*/ ++loadedPartitions; } catch (IOException e) { throw new RuntimeException( - "loadCheckpoing: Failed to get partition owner " + + "loadCheckpoint: Failed to get partition owner " + partitionOwner, e); } } @@ -1269,7 +1273,11 @@ else[HADOOP_NON_SECURE]*/ // Communication service needs to setup the connections prior to // processing vertices +/*if[HADOOP_NON_SECURE] commService.setup(); +else[HADOOP_NON_SECURE]*/ + commService.setup(getConfiguration().authenticate()); +/*end[HADOOP_NON_SECURE]*/ } /** Added: giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1396722&view=auto ============================================================================== --- giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (added) +++ giraph/trunk/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Wed Oct 10 18:35:31 2012 @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm; + +import com.google.common.collect.Maps; +import java.net.InetSocketAddress; +import java.util.Map; +import org.apache.giraph.GiraphConfiguration; +import org.apache.giraph.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.comm.messages.SimpleMessageStore; +import org.apache.giraph.comm.netty.NettyClient; +import org.apache.giraph.comm.netty.NettyServer; +import org.apache.giraph.comm.netty.handler.SaslServerHandler; +import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; +import org.apache.giraph.graph.EdgeListVertex; +import org.apache.giraph.utils.MockUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +/** + * Netty connection with mocked authentication. + */ +public class SaslConnectionTest { + /** Class configuration */ + private ImmutableClassesGiraphConfiguration conf; + + public static class IntVertex extends EdgeListVertex<IntWritable, + IntWritable, IntWritable, IntWritable> { + @Override + public void compute(Iterable<IntWritable> messages) throws IOException { + } + } + + @Before + public void setUp() { + GiraphConfiguration tmpConfig = new GiraphConfiguration(); + tmpConfig.setVertexClass(IntVertex.class); + tmpConfig.setBoolean(GiraphConfiguration.AUTHENTICATE, true); + conf = new ImmutableClassesGiraphConfiguration(tmpConfig); + } + + /** + * Test connecting a single client to a single server. + * + * @throws IOException + */ + @Test + public void connectSingleClientServer() throws IOException { + @SuppressWarnings("rawtypes") + Context context = mock(Context.class); + when(context.getConfiguration()).thenReturn(conf); + + ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData = + new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>( + conf, + SimpleMessageStore.newFactory( + MockUtils.mockServiceGetVertexPartitionOwner(1), conf), + context); + + SaslServerHandler.Factory mockedSaslServerFactory = + Mockito.mock(SaslServerHandler.Factory.class); + + SaslServerHandler mockedSaslServerHandler = + Mockito.mock(SaslServerHandler.class); + when(mockedSaslServerFactory.newHandler(conf)). + thenReturn(mockedSaslServerHandler); + + NettyServer server = + new NettyServer(conf, + new WorkerRequestServerHandler.Factory(serverData), + mockedSaslServerFactory); + server.start(); + + NettyClient client = new NettyClient(context, conf); + Map<InetSocketAddress, Integer> addressIdMap = Maps.newHashMap(); + addressIdMap.put(server.getMyAddress(), -1); + client.connectAllAddresses(addressIdMap); + + client.stop(); + server.stop(); + } +}
