wonook commented on a change in pull request #45: [NEMO-103] Implement RPC 
between Client and Driver
URL: https://github.com/apache/incubator-nemo/pull/45#discussion_r195986628
 
 

 ##########
 File path: runtime/driver/src/main/java/edu/snu/nemo/driver/ClientRPC.java
 ##########
 @@ -0,0 +1,147 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.driver;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.SyncStage;
+import org.apache.reef.wake.remote.Encoder;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.transport.Link;
+import org.apache.reef.wake.remote.transport.LinkListener;
+import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.TransportFactory;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+/**
+ * Driver-side RPC implementation to NemoClient.
+ */
+public final class ClientRPC {
+  private final Transport transport;
+  private final Link<ControlMessage.DriverToClientMessage> link;
+  private final InjectionFuture<NemoDriver> nemoDriver;
+  private volatile boolean isClosed = false;
+  private static final DriverToClientMessageEncoder ENCODER = new 
DriverToClientMessageEncoder();
+  private static final ClientRPCLinkListener LINK_LISTENER = new 
ClientRPCLinkListener();
+  private static final int RETRY_COUNT = 10;
+  private static final int RETRY_TIMEOUT = 100;
+  @Inject
+  private ClientRPC(final TransportFactory transportFactory,
+                    final LocalAddressProvider localAddressProvider,
+                    final InjectionFuture<NemoDriver> nemoDriver,
+                    @Parameter(JobConf.ClientSideRPCServerHost.class) final 
String clientHost,
+                    @Parameter(JobConf.ClientSideRPCServerPort.class) final 
int clientPort) throws IOException {
+    this.nemoDriver = nemoDriver;
+    transport = 
transportFactory.newInstance(localAddressProvider.getLocalAddress(),
+        0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, 
RETRY_TIMEOUT);
+    final SocketAddress clientAddress = new InetSocketAddress(clientHost, 
clientPort);
+    link = transport.open(clientAddress, ENCODER, LINK_LISTENER);
+  }
+
+  /**
+   * Shuts down the transport.
+   */
+  public void shutdown() {
+    ensureRunning();
+    try {
+      transport.close();
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      isClosed = true;
+    }
+  }
+
+  /**
+   * Write message to client.
+   * @param message message to send.
+   */
+  public void send(final ControlMessage.DriverToClientMessage message) {
+    ensureRunning();
+    link.write(message);
+  }
+
+  /**
+   * Handles message from client.
+   * @param message message to process
+   */
+  private void handleMessage(final ControlMessage.ClientToDriverMessage 
message) {
+    if (message.getType() == 
ControlMessage.ClientToDriverMessageType.LaunchDAG) {
+      
nemoDriver.get().startSchedulingUserApplication(message.getLaunchDAG().getDag());
+    }
+  }
+
+  /**
+   * Provides event handler for messages from client.
+   */
+  private final class RPCEventHandler implements EventHandler<TransportEvent> {
+    @Override
+    public void onNext(final TransportEvent transportEvent) {
+      try {
+        final byte[] data = transportEvent.getData();
+        final ControlMessage.ClientToDriverMessage message = 
ControlMessage.ClientToDriverMessage.parseFrom(data);
+        handleMessage(message);
+      } catch (final InvalidProtocolBufferException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Ensure the Transport is running.
+   */
+  private void ensureRunning() {
+    if (isClosed) {
+      throw new RuntimeException("The ClientRPC is already closed");
+    }
+  }
+
+  /**
+   * Provides encoder for {@link 
edu.snu.nemo.runtime.common.comm.ControlMessage.DriverToClientMessage}.
+   */
+  private static final class DriverToClientMessageEncoder implements 
Encoder<ControlMessage.DriverToClientMessage> {
+    @Override
+    public byte[] encode(final ControlMessage.DriverToClientMessage 
driverToClientMessage) {
+      return driverToClientMessage.toByteArray();
+    }
+  }
+
+  /**
+   * Provides {@link LinkListener}.
+   */
+  private static final class ClientRPCLinkListener implements 
LinkListener<ControlMessage.DriverToClientMessage> {
+
+    @Override
+    public void onSuccess(final ControlMessage.DriverToClientMessage 
driverToClientMessage) {
 
 Review comment:
   What's the method for?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to