wonook commented on a change in pull request #45: [NEMO-103] Implement RPC between Client and Driver URL: https://github.com/apache/incubator-nemo/pull/45#discussion_r195986628
########## File path: runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java ########## @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.driver; + +import com.google.protobuf.InvalidProtocolBufferException; +import edu.snu.nemo.conf.JobConf; +import edu.snu.nemo.runtime.common.comm.ControlMessage; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SyncStage; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** + * Driver-side RPC implementation to NemoClient. + */ +public final class ClientRPC { + private final Transport transport; + private final Link<ControlMessage.DriverToClientMessage> link; + private final InjectionFuture<NemoDriver> nemoDriver; + private volatile boolean isClosed = false; + private static final DriverToClientMessageEncoder ENCODER = new DriverToClientMessageEncoder(); + private static final ClientRPCLinkListener LINK_LISTENER = new ClientRPCLinkListener(); + private static final int RETRY_COUNT = 10; + private static final int RETRY_TIMEOUT = 100; + @Inject + private ClientRPC(final TransportFactory transportFactory, + final LocalAddressProvider localAddressProvider, + final InjectionFuture<NemoDriver> nemoDriver, + @Parameter(JobConf.ClientSideRPCServerHost.class) final String clientHost, + @Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) throws IOException { + this.nemoDriver = nemoDriver; + transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), + 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, RETRY_TIMEOUT); + final SocketAddress clientAddress = new InetSocketAddress(clientHost, clientPort); + link = transport.open(clientAddress, ENCODER, LINK_LISTENER); + } + + /** + * Shuts down the transport. + */ + public void shutdown() { + ensureRunning(); + try { + transport.close(); + } catch (final Exception e) { + throw new RuntimeException(e); + } finally { + isClosed = true; + } + } + + /** + * Write message to client. + * @param message message to send. + */ + public void send(final ControlMessage.DriverToClientMessage message) { + ensureRunning(); + link.write(message); + } + + /** + * Handles message from client. + * @param message message to process + */ + private void handleMessage(final ControlMessage.ClientToDriverMessage message) { + if (message.getType() == ControlMessage.ClientToDriverMessageType.LaunchDAG) { + nemoDriver.get().startSchedulingUserApplication(message.getLaunchDAG().getDag()); + } + } + + /** + * Provides event handler for messages from client. + */ + private final class RPCEventHandler implements EventHandler<TransportEvent> { + @Override + public void onNext(final TransportEvent transportEvent) { + try { + final byte[] data = transportEvent.getData(); + final ControlMessage.ClientToDriverMessage message = ControlMessage.ClientToDriverMessage.parseFrom(data); + handleMessage(message); + } catch (final InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Ensure the Transport is running. + */ + private void ensureRunning() { + if (isClosed) { + throw new RuntimeException("The ClientRPC is already closed"); + } + } + + /** + * Provides encoder for {@link edu.snu.nemo.runtime.common.comm.ControlMessage.DriverToClientMessage}. + */ + private static final class DriverToClientMessageEncoder implements Encoder<ControlMessage.DriverToClientMessage> { + @Override + public byte[] encode(final ControlMessage.DriverToClientMessage driverToClientMessage) { + return driverToClientMessage.toByteArray(); + } + } + + /** + * Provides {@link LinkListener}. + */ + private static final class ClientRPCLinkListener implements LinkListener<ControlMessage.DriverToClientMessage> { + + @Override + public void onSuccess(final ControlMessage.DriverToClientMessage driverToClientMessage) { Review comment: What's the method for? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services