This is an automated email from the ASF dual-hosted git repository.

wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 085f0e8  YARN-9086. [CSI] Run csi-driver-adaptor as aux service. 
Contributed by Weiwei Yang.
085f0e8 is described below

commit 085f0e8ae7f43b42bd3d81ad11322ac8f8d664fd
Author: Weiwei Yang <w...@apache.org>
AuthorDate: Tue Jan 29 14:53:08 2019 +0800

    YARN-9086. [CSI] Run csi-driver-adaptor as aux service. Contributed by 
Weiwei Yang.
---
 .../apache/hadoop/yarn/api/CsiAdaptorPlugin.java   |  50 ++++
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |   2 +
 .../src/main/resources/yarn-default.xml            |  10 +-
 .../hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java |  73 +++++
 .../csi/adaptor/CsiAdaptorProtocolService.java     | 100 +------
 .../yarn/csi/adaptor/CsiAdaptorServices.java       | 108 +++++++
 ...ocolService.java => DefaultCsiAdaptorImpl.java} |  92 ++----
 .../hadoop/yarn/csi/adaptor/MockCsiAdaptor.java    |  85 ++++++
 .../yarn/csi/adaptor/TestCsiAdaptorService.java    | 326 ++++++++++++++++-----
 9 files changed, 613 insertions(+), 233 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPlugin.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPlugin.java
new file mode 100644
index 0000000..26b45f7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPlugin.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * csi-adaptor is a plugin, user can provide customized implementation
+ * according to this interface. NM will init and load this into a NM aux
+ * service, and it can run multiple csi-adaptor servers.
+ *
+ * User needs to implement all the methods defined in
+ * {@link CsiAdaptorProtocol}, and plus the methods in this interface.
+ */
+public interface CsiAdaptorPlugin extends CsiAdaptorProtocol {
+
+  /**
+   * A csi-adaptor implementation can init its state within this function.
+   * Configuration is available so the implementation can retrieve some
+   * customized configuration from yarn-site.xml.
+   * @param driverName the name of the csi-driver.
+   * @param conf configuration.
+   * @throws YarnException
+   */
+  void init(String driverName, Configuration conf) throws YarnException;
+
+  /**
+   * Returns the driver name of the csi-driver this adaptor works with.
+   * The name should be consistent on all the places being used, ideally
+   * it should come from the value when init is done.
+   * @return the name of the csi-driver that this adaptor works with.
+   */
+  String getDriverName();
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 2be73e1..de66e75 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3489,6 +3489,8 @@ public class YarnConfiguration extends Configuration {
       ".endpoint";
   public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX =
       ".address";
+  public static final String NM_CSI_ADAPTOR_CLASS =
+      ".class";
   /**
    * One or more socket addresses for csi-adaptor.
    * Multiple addresses are delimited by ",".
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2508c48..45894e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4096,12 +4096,20 @@
     <description>
       CSI driver names running on this node, multiple driver names need to
       be delimited by comma. The driver name should be same value returned
-      by the getPluginInfo call. For each of the CSI driver name, it must
+      by the getPluginInfo call.For each of the CSI driver name, it must
       to define following two corresponding properties:
         "yarn.nodemanager.csi-driver.${NAME}.endpoint"
         "yarn.nodemanager.csi-driver-adaptor.${NAME}.address"
       The 1st property defines where the driver's endpoint is;
       2nd property defines where the mapping csi-driver-adaptor's address is.
+      What's more, an optional csi-driver-adaptor class can be defined
+      for each csi-driver:
+        "yarn.nodemanager.csi-driver.${NAME}.class"
+      once given, the adaptor will be initiated with the given class instead
+      of the default implementation
+      org.apache.hadoop.yarn.csi.adaptor.DefaultCsiAdaptorImpl. User can plug
+      customized adaptor code for csi-driver with this configuration
+      if necessary.
     </description>
     <name>yarn.nodemanager.csi-driver.names</name>
     <value></value>
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java
new file mode 100644
index 0000000..955c36f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Desired csi-adaptor implementation is configurable, default to
+ * CsiAdaptorProtocolService. If user wants to have a different implementation,
+ * just to configure a different class for the csi-driver.
+ */
+public final class CsiAdaptorFactory {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CsiAdaptorFactory.class);
+
+  private CsiAdaptorFactory() {
+    // hide constructor for this factory class.
+  }
+
+  /**
+   * Load csi-driver-adaptor from configuration. If the configuration is not
+   * specified, the default implementation
+   * for the adaptor is {@link DefaultCsiAdaptorImpl}. If the configured class
+   * is not a valid variation of {@link CsiAdaptorPlugin} or the class cannot
+   * be found, this function will throw a RuntimeException.
+   * @param driverName
+   * @param conf
+   * @return CsiAdaptorPlugin
+   * @throws YarnException if unable to create the adaptor class.
+   * @throws RuntimeException if given class is not found or not
+   *   an instance of {@link CsiAdaptorPlugin}
+   */
+  public static CsiAdaptorPlugin getAdaptor(String driverName,
+      Configuration conf) throws YarnException {
+    // load configuration
+    String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+        + driverName + YarnConfiguration.NM_CSI_ADAPTOR_CLASS;
+    Class<? extends CsiAdaptorPlugin> impl = conf.getClass(configName,
+        DefaultCsiAdaptorImpl.class, CsiAdaptorPlugin.class);
+    if (impl == null) {
+      throw new YarnException("Unable to init csi-adaptor from the"
+          + " class specified via " + configName);
+    }
+
+    // init the adaptor
+    CsiAdaptorPlugin instance = ReflectionUtils.newInstance(impl, conf);
+    LOG.info("csi-adaptor initiated, implementation: "
+        + impl.getCanonicalName());
+    return instance;
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
index 7020f06..300c5bc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
@@ -17,11 +17,11 @@
  */
 package org.apache.hadoop.yarn.csi.adaptor;
 
-import com.google.common.annotations.VisibleForTesting;
-import csi.v0.Csi;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
@@ -30,27 +30,20 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
-import org.apache.hadoop.yarn.csi.client.CsiClient;
-import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
-import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 
 /**
  * This is a Hadoop RPC server, we uses the Hadoop RPC framework here
  * because we need to stick to the security model current Hadoop supports.
  */
-public class CsiAdaptorProtocolService extends AuxiliaryService
+public class CsiAdaptorProtocolService extends AbstractService
     implements CsiAdaptorProtocol {
 
   private static final Logger LOG =
@@ -58,35 +51,17 @@ public class CsiAdaptorProtocolService extends 
AuxiliaryService
 
   private Server server;
   private InetSocketAddress adaptorServiceAddress;
-  private CsiClient csiClient;
-  private String csiDriverName;
+  private CsiAdaptorPlugin serverImpl;
 
-  public CsiAdaptorProtocolService() {
+  public CsiAdaptorProtocolService(CsiAdaptorPlugin adaptorImpl) {
     super(CsiAdaptorProtocolService.class.getName());
-    // TODO read this from configuration
-    this.csiDriverName =  "ch.ctrox.csi.s3-driver";
-  }
-
-  public CsiAdaptorProtocolService(String driverName,
-      String domainSocketPath) {
-    super(CsiAdaptorProtocolService.class.getName());
-    this.csiClient = new CsiClientImpl(domainSocketPath);
-    this.csiDriverName = driverName;
-  }
-
-  @VisibleForTesting
-  public void setCsiClient(CsiClient client) {
-    this.csiClient = client;
+    this.serverImpl = adaptorImpl;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-
-    String driverEndpoint = CsiConfigUtils
-        .getCsiDriverEndpoint(csiDriverName, conf);
-    this.csiClient = new CsiClientImpl(driverEndpoint);
     adaptorServiceAddress = CsiConfigUtils
-        .getCsiAdaptorAddressForDriver(csiDriverName, conf);
+        .getCsiAdaptorAddressForDriver(serverImpl.getDriverName(), conf);
     super.serviceInit(conf);
   }
 
@@ -96,7 +71,7 @@ public class CsiAdaptorProtocolService extends 
AuxiliaryService
     YarnRPC rpc = YarnRPC.create(conf);
     this.server = rpc.getServer(
         CsiAdaptorProtocol.class,
-        this, adaptorServiceAddress, conf, null, 1);
+        serverImpl, adaptorServiceAddress, conf, null, 1);
     this.server.start();
     LOG.info("{} started, listening on address: {}",
         CsiAdaptorProtocolService.class.getName(),
@@ -115,76 +90,25 @@ public class CsiAdaptorProtocolService extends 
AuxiliaryService
   @Override
   public GetPluginInfoResponse getPluginInfo(
       GetPluginInfoRequest request) throws YarnException, IOException {
-    Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
-    return ProtoTranslatorFactory.getTranslator(
-        GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
-        .convertFrom(response);
+    return serverImpl.getPluginInfo(request);
   }
 
   @Override
   public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
       ValidateVolumeCapabilitiesRequest request) throws YarnException,
       IOException {
-    Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
-        .getTranslator(ValidateVolumeCapabilitiesRequest.class,
-            Csi.ValidateVolumeCapabilitiesRequest.class)
-        .convertTo(request);
-    Csi.ValidateVolumeCapabilitiesResponse response =
-        csiClient.validateVolumeCapabilities(req);
-    return ProtoTranslatorFactory.getTranslator(
-        ValidateVolumeCapabilitiesResponse.class,
-        Csi.ValidateVolumeCapabilitiesResponse.class)
-        .convertFrom(response);
+    return serverImpl.validateVolumeCapacity(request);
   }
 
   @Override
   public NodePublishVolumeResponse nodePublishVolume(
       NodePublishVolumeRequest request) throws YarnException, IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Received nodePublishVolume call, request: {}",
-          request.toString());
-    }
-    Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
-        .getTranslator(NodePublishVolumeRequest.class,
-            Csi.NodePublishVolumeRequest.class).convertTo(request);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translate to CSI proto message: {}", req.toString());
-    }
-    csiClient.nodePublishVolume(req);
-    return NodePublishVolumeResponse.newInstance();
+    return serverImpl.nodePublishVolume(request);
   }
 
   @Override
   public NodeUnpublishVolumeResponse nodeUnpublishVolume(
       NodeUnpublishVolumeRequest request) throws YarnException, IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Received nodeUnpublishVolume call, request: {}",
-          request.toString());
-    }
-    Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
-        .getTranslator(NodeUnpublishVolumeRequest.class,
-            Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Translate to CSI proto message: {}", req.toString());
-    }
-    csiClient.nodeUnpublishVolume(req);
-    return NodeUnpublishVolumeResponse.newInstance();
-  }
-
-  @Override
-  public void initializeApplication(
-      ApplicationInitializationContext initAppContext) {
-    // do nothing
-  }
-
-  @Override
-  public void stopApplication(
-      ApplicationTerminationContext stopAppContext) {
-    // do nothing
-  }
-
-  @Override
-  public ByteBuffer getMetaData() {
-    return ByteBuffer.allocate(0);
+    return serverImpl.nodeUnpublishVolume(request);
   }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorServices.java
new file mode 100644
index 0000000..78debbe
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorServices.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * NM manages csi-adaptors as a single NM AUX service, this service
+ * manages a set of rpc services and each of them serves one particular
+ * csi-driver. It loads all available drivers from configuration, and
+ * find a csi-driver-adaptor implementation class for each of them. At last
+ * it brings up all of them as a composite service.
+ */
+public class CsiAdaptorServices extends AuxiliaryService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CsiAdaptorServices.class);
+
+  private List<CsiAdaptorProtocolService> serviceList;
+  protected CsiAdaptorServices() {
+    super(CsiAdaptorServices.class.getName());
+    serviceList = new ArrayList<>();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    // load configuration and init adaptors
+    String[] names = CsiConfigUtils.getCsiDriverNames(conf);
+    if (names != null && names.length > 0) {
+      for (String driverName : names) {
+        LOG.info("Adding csi-driver-adaptor for csi-driver {}", driverName);
+        CsiAdaptorPlugin serviceImpl = CsiAdaptorFactory
+            .getAdaptor(driverName, conf);
+        serviceImpl.init(driverName, conf);
+        CsiAdaptorProtocolService service =
+            new CsiAdaptorProtocolService(serviceImpl);
+        serviceList.add(service);
+        service.serviceInit(conf);
+      }
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (serviceList != null && serviceList.size() > 0) {
+      for (CsiAdaptorProtocolService service : serviceList) {
+        try {
+          service.serviceStop();
+        } catch (Exception e) {
+          LOG.warn("Unable to stop service " + service.getName(), e);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    if (serviceList != null && serviceList.size() > 0) {
+      for (CsiAdaptorProtocolService service : serviceList) {
+        service.serviceStart();
+      }
+    }
+  }
+
+  @Override
+  public void initializeApplication(
+      ApplicationInitializationContext initAppContext) {
+    // do nothing
+  }
+
+  @Override
+  public void stopApplication(
+      ApplicationTerminationContext stopAppContext) {
+    // do nothing
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    return ByteBuffer.allocate(0);
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/DefaultCsiAdaptorImpl.java
similarity index 63%
copy from 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
copy to 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/DefaultCsiAdaptorImpl.java
index 7020f06..a203587 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/DefaultCsiAdaptorImpl.java
@@ -17,11 +17,9 @@
  */
 package org.apache.hadoop.yarn.csi.adaptor;
 
-import com.google.common.annotations.VisibleForTesting;
 import csi.v0.Csi;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
@@ -34,82 +32,43 @@ import org.apache.hadoop.yarn.csi.client.CsiClient;
 import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
 import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 
 /**
- * This is a Hadoop RPC server, we uses the Hadoop RPC framework here
- * because we need to stick to the security model current Hadoop supports.
+ * The default implementation of csi-driver-adaptor service.
  */
-public class CsiAdaptorProtocolService extends AuxiliaryService
-    implements CsiAdaptorProtocol {
+public class DefaultCsiAdaptorImpl implements CsiAdaptorPlugin {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(CsiAdaptorProtocolService.class);
+      LoggerFactory.getLogger(DefaultCsiAdaptorImpl.class);
 
-  private Server server;
-  private InetSocketAddress adaptorServiceAddress;
+  private String driverName;
   private CsiClient csiClient;
-  private String csiDriverName;
 
-  public CsiAdaptorProtocolService() {
-    super(CsiAdaptorProtocolService.class.getName());
-    // TODO read this from configuration
-    this.csiDriverName =  "ch.ctrox.csi.s3-driver";
-  }
-
-  public CsiAdaptorProtocolService(String driverName,
-      String domainSocketPath) {
-    super(CsiAdaptorProtocolService.class.getName());
-    this.csiClient = new CsiClientImpl(domainSocketPath);
-    this.csiDriverName = driverName;
-  }
-
-  @VisibleForTesting
-  public void setCsiClient(CsiClient client) {
-    this.csiClient = client;
+  public DefaultCsiAdaptorImpl() {
+    // use default constructor for reflection
   }
 
   @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-
+  public void init(String driverName, Configuration conf)
+      throws YarnException {
+    // if the driver end point is invalid, following code will fail.
     String driverEndpoint = CsiConfigUtils
-        .getCsiDriverEndpoint(csiDriverName, conf);
+        .getCsiDriverEndpoint(driverName, conf);
+    LOG.info("This csi-adaptor is configured to contact with"
+            + " the csi-driver {} via gRPC endpoint: {}",
+        driverName, driverEndpoint);
     this.csiClient = new CsiClientImpl(driverEndpoint);
-    adaptorServiceAddress = CsiConfigUtils
-        .getCsiAdaptorAddressForDriver(csiDriverName, conf);
-    super.serviceInit(conf);
+    this.driverName = driverName;
   }
 
   @Override
-  protected void serviceStart() throws Exception {
-    Configuration conf = getConfig();
-    YarnRPC rpc = YarnRPC.create(conf);
-    this.server = rpc.getServer(
-        CsiAdaptorProtocol.class,
-        this, adaptorServiceAddress, conf, null, 1);
-    this.server.start();
-    LOG.info("{} started, listening on address: {}",
-        CsiAdaptorProtocolService.class.getName(),
-        adaptorServiceAddress.toString());
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (this.server != null) {
-      this.server.stop();
-    }
-    super.serviceStop();
+  public String getDriverName() {
+    return driverName;
   }
 
   @Override
@@ -170,21 +129,4 @@ public class CsiAdaptorProtocolService extends 
AuxiliaryService
     csiClient.nodeUnpublishVolume(req);
     return NodeUnpublishVolumeResponse.newInstance();
   }
-
-  @Override
-  public void initializeApplication(
-      ApplicationInitializationContext initAppContext) {
-    // do nothing
-  }
-
-  @Override
-  public void stopApplication(
-      ApplicationTerminationContext stopAppContext) {
-    // do nothing
-  }
-
-  @Override
-  public ByteBuffer getMetaData() {
-    return ByteBuffer.allocate(0);
-  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/MockCsiAdaptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/MockCsiAdaptor.java
new file mode 100644
index 0000000..4bcc509
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/MockCsiAdaptor.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * This class is used by {@link TestCsiAdaptorService} for testing.
+ * It gives some dummy implementation for a adaptor plugin, and used to
+ * verify the plugin can be properly loaded by NM and execution logic is
+ * as expected.
+ *
+ * This is created as a separated class instead of an inner class, because
+ * {@link CsiAdaptorServices} is loading classes using conf.getClass(),
+ * the utility class is unable to resolve inner classes.
+ */
+public class MockCsiAdaptor implements CsiAdaptorPlugin {
+
+  private String driverName;
+
+  @Override
+  public void init(String driverName, Configuration conf)
+      throws YarnException {
+    this.driverName = driverName;
+  }
+
+  @Override
+  public String getDriverName() {
+    return this.driverName;
+  }
+
+  @Override
+  public GetPluginInfoResponse getPluginInfo(
+      GetPluginInfoRequest request) throws YarnException, IOException {
+    return GetPluginInfoResponse.newInstance(driverName,
+        "1.0");
+  }
+
+  @Override
+  public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+      ValidateVolumeCapabilitiesRequest request)
+      throws YarnException, IOException {
+    return ValidateVolumeCapabilitiesResponse.newInstance(true,
+        "verified via MockCsiAdaptor");
+  }
+
+  @Override
+  public NodePublishVolumeResponse nodePublishVolume(
+      NodePublishVolumeRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+    return null;
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
index d6ee231..c415ced 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
@@ -27,13 +27,19 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorPlugin;
 import 
org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
 import org.apache.hadoop.yarn.client.NMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.csi.client.ICsiClientTest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.junit.AfterClass;
@@ -72,6 +78,39 @@ public class TestCsiAdaptorService {
     }
   }
 
+  private interface FakeCsiAdaptor extends CsiAdaptorPlugin {
+
+    default void init(String driverName, Configuration conf)
+        throws YarnException {
+      return;
+    }
+
+    default String getDriverName() {
+      return null;
+    }
+
+    default GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
+        throws YarnException, IOException {
+      return null;
+    }
+
+    default ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+        ValidateVolumeCapabilitiesRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    default NodePublishVolumeResponse nodePublishVolume(
+        NodePublishVolumeRequest request) throws YarnException, IOException {
+      return null;
+    }
+
+    default NodeUnpublishVolumeResponse nodeUnpublishVolume(
+        NodeUnpublishVolumeRequest request) throws YarnException, IOException{
+      return null;
+    }
+  }
+
   @Test
   public void testValidateVolume() throws IOException, YarnException {
     ServerSocket ss = new ServerSocket(0);
@@ -83,50 +122,52 @@ public class TestCsiAdaptorService {
         address);
     conf.set(
         YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
-        "unix:///tmp/test-driver.scok");
-    CsiAdaptorProtocolService service =
-        new CsiAdaptorProtocolService("test-driver", domainSocket);
-    service.init(conf);
-    service.start();
+        "unix:///tmp/test-driver.sock");
 
-    // inject a fake CSI client
+    // inject a fake CSI adaptor
     // this client validates if the ValidateVolumeCapabilitiesRequest
     // is integrity, and then reply a fake response
-    service.setCsiClient(new ICsiClientTest() {
+    CsiAdaptorPlugin plugin = new FakeCsiAdaptor() {
+      @Override
+      public String getDriverName() {
+        return "test-driver";
+      }
+
+
       @Override
-      public Csi.GetPluginInfoResponse getPluginInfo() {
-        return Csi.GetPluginInfoResponse.newBuilder()
-            .setName("test-plugin")
-            .setVendorVersion("0.1")
-            .build();
+      public GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request) 
{
+        return GetPluginInfoResponse.newInstance("test-plugin", "0.1");
       }
 
       @Override
-      public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
-          Csi.ValidateVolumeCapabilitiesRequest request) {
+      public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+          ValidateVolumeCapabilitiesRequest request) throws YarnException,
+          IOException {
         // validate we get all info from the request
         Assert.assertEquals("volume-id-0000123", request.getVolumeId());
-        Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+        Assert.assertEquals(1, request.getVolumeCapabilities().size());
         Assert.assertEquals(Csi.VolumeCapability.AccessMode
-                .newBuilder().setModeValue(5).build(),
-            request.getVolumeCapabilities(0).getAccessMode());
-        Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
-        Assert.assertEquals(2, request.getVolumeCapabilities(0)
-            .getMount().getMountFlagsCount());
-        Assert.assertTrue(request.getVolumeCapabilities(0)
-            .getMount().getMountFlagsList().contains("mountFlag1"));
-        Assert.assertTrue(request.getVolumeCapabilities(0)
-            .getMount().getMountFlagsList().contains("mountFlag2"));
-        Assert.assertEquals(2, request.getVolumeAttributesCount());
-        Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
-        Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+                .newBuilder().setModeValue(5).build().getMode().name(),
+            request.getVolumeCapabilities().get(0).getAccessMode().name());
+        Assert.assertEquals(2, request.getVolumeCapabilities().get(0)
+            .getMountFlags().size());
+        Assert.assertTrue(request.getVolumeCapabilities().get(0)
+            .getMountFlags().contains("mountFlag1"));
+        Assert.assertTrue(request.getVolumeCapabilities().get(0)
+            .getMountFlags().contains("mountFlag2"));
+        Assert.assertEquals(2, request.getVolumeAttributes().size());
+        Assert.assertEquals("v1", request.getVolumeAttributes().get("k1"));
+        Assert.assertEquals("v2", request.getVolumeAttributes().get("k2"));
         // return a fake result
-        return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
-            .setSupported(false)
-            .setMessage("this is a test")
-            .build();
+        return ValidateVolumeCapabilitiesResponse
+            .newInstance(false, "this is a test");
       }
-    });
+    };
+
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService(plugin);
+    service.init(conf);
+    service.start();
 
     try (CsiAdaptorProtocolPBClientImpl client =
         new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
@@ -161,50 +202,53 @@ public class TestCsiAdaptorService {
         address);
     conf.set(
         YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
-        "unix:///tmp/test-driver.scok");
-    CsiAdaptorProtocolService service =
-        new CsiAdaptorProtocolService("test-driver", domainSocket);
-    service.init(conf);
-    service.start();
+        "unix:///tmp/test-driver.sock");
 
-    // inject a fake CSI client
+    // inject a fake CSI adaptor
     // this client validates if the ValidateVolumeCapabilitiesRequest
     // is integrity, and then reply a fake response
-    service.setCsiClient(new ICsiClientTest() {
+    FakeCsiAdaptor plugin = new FakeCsiAdaptor() {
+      @Override
+      public String getDriverName() {
+        return "test-driver";
+      }
+
       @Override
-      public Csi.GetPluginInfoResponse getPluginInfo() {
-        return Csi.GetPluginInfoResponse.newBuilder()
-            .setName("test-plugin")
-            .setVendorVersion("0.1")
-            .build();
+      public GetPluginInfoResponse getPluginInfo(
+          GetPluginInfoRequest request) throws YarnException, IOException {
+        return GetPluginInfoResponse.newInstance("test-plugin", "0.1");
       }
 
       @Override
-      public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
-          Csi.ValidateVolumeCapabilitiesRequest request) {
+      public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+          ValidateVolumeCapabilitiesRequest request)
+          throws YarnException, IOException {
         // validate we get all info from the request
         Assert.assertEquals("volume-id-0000123", request.getVolumeId());
-        Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
-        Assert.assertEquals(Csi.VolumeCapability.AccessMode
-                .newBuilder().setModeValue(5).build(),
-            request.getVolumeCapabilities(0).getAccessMode());
-        Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
-        Assert.assertEquals(2, request.getVolumeCapabilities(0)
-            .getMount().getMountFlagsCount());
-        Assert.assertTrue(request.getVolumeCapabilities(0)
-            .getMount().getMountFlagsList().contains("mountFlag1"));
-        Assert.assertTrue(request.getVolumeCapabilities(0)
-            .getMount().getMountFlagsList().contains("mountFlag2"));
-        Assert.assertEquals(2, request.getVolumeAttributesCount());
-        Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
-        Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+        Assert.assertEquals(1, request.getVolumeCapabilities().size());
+        Assert.assertEquals(
+            Csi.VolumeCapability.AccessMode.newBuilder().setModeValue(5)
+                .build().getMode().name(),
+            request.getVolumeCapabilities().get(0).getAccessMode().name());
+        Assert.assertEquals(2,
+            request.getVolumeCapabilities().get(0).getMountFlags().size());
+        
Assert.assertTrue(request.getVolumeCapabilities().get(0).getMountFlags()
+            .contains("mountFlag1"));
+        
Assert.assertTrue(request.getVolumeCapabilities().get(0).getMountFlags()
+            .contains("mountFlag2"));
+        Assert.assertEquals(2, request.getVolumeAttributes().size());
+        Assert.assertEquals("v1", request.getVolumeAttributes().get("k1"));
+        Assert.assertEquals("v2", request.getVolumeAttributes().get("k2"));
         // return a fake result
-        return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
-            .setSupported(false)
-            .setMessage("this is a test")
-            .build();
+        return ValidateVolumeCapabilitiesResponse
+            .newInstance(false, "this is a test");
       }
-    });
+    };
+
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService(plugin);
+    service.init(conf);
+    service.start();
 
     YarnRPC rpc = YarnRPC.create(conf);
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
@@ -232,7 +276,7 @@ public class TestCsiAdaptorService {
   public void testMissingConfiguration() {
     Configuration conf = new Configuration();
     CsiAdaptorProtocolService service =
-        new CsiAdaptorProtocolService("test-driver", domainSocket);
+        new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
     service.init(conf);
   }
 
@@ -243,7 +287,7 @@ public class TestCsiAdaptorService {
         + "test-driver-0001.address",
         "0.0.0.0:-100"); // this is an invalid address
     CsiAdaptorProtocolService service =
-        new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+        new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
     service.init(conf);
   }
 
@@ -254,7 +298,151 @@ public class TestCsiAdaptorService {
             + "test-driver-0001.address",
         "192.0.1:8999"); // this is an invalid ip address
     CsiAdaptorProtocolService service =
-        new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+        new CsiAdaptorProtocolService(new FakeCsiAdaptor() {});
     service.init(conf);
   }
+
+  @Test
+  public void testCustomizedAdaptor() throws IOException, YarnException {
+    ServerSocket ss = new ServerSocket(0);
+    ss.close();
+    InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_CSI_DRIVER_NAMES, "customized-driver");
+    conf.setSocketAddr(
+        YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "customized-driver.address",
+        address);
+    conf.set(
+        YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "customized-driver.class",
+        "org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
+    conf.set(
+        YarnConfiguration.NM_CSI_DRIVER_PREFIX + "customized-driver.endpoint",
+        "unix:///tmp/customized-driver.sock");
+
+    CsiAdaptorServices services =
+        new CsiAdaptorServices();
+    services.init(conf);
+    services.start();
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    CsiAdaptorProtocol adaptorClient = NMProxy
+        .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+            NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort()));
+
+    // Test getPluginInfo
+    GetPluginInfoResponse pluginInfo =
+        adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
+    Assert.assertEquals(pluginInfo.getDriverName(), "customized-driver");
+    Assert.assertEquals(pluginInfo.getVersion(), "1.0");
+
+    // Test validateVolumeCapacity
+    ValidateVolumeCapabilitiesRequest request =
+        ValidateVolumeCapabilitiesRequestPBImpl
+            .newInstance("volume-id-0000123",
+                ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+                    .VolumeCapability(
+                    MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                    ImmutableList.of("mountFlag1", "mountFlag2"))),
+                ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+    ValidateVolumeCapabilitiesResponse response = adaptorClient
+        .validateVolumeCapacity(request);
+    Assert.assertEquals(true, response.isSupported());
+    Assert.assertEquals("verified via MockCsiAdaptor",
+        response.getResponseMessage());
+
+    services.stop();
+  }
+
+  @Test
+  public void testMultipleCsiAdaptors() throws IOException, YarnException {
+    ServerSocket driver1Addr = new ServerSocket(0);
+    ServerSocket driver2Addr = new ServerSocket(0);
+
+    InetSocketAddress address1 =
+        new InetSocketAddress(driver1Addr.getLocalPort());
+    InetSocketAddress address2 =
+        new InetSocketAddress(driver2Addr.getLocalPort());
+
+    Configuration conf = new Configuration();
+
+    // Two csi-drivers configured
+    conf.set(YarnConfiguration.NM_CSI_DRIVER_NAMES,
+        "customized-driver-1,customized-driver-2");
+
+    // customized-driver-1
+    conf.setSocketAddr(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+            + "customized-driver-1.address", address1);
+    conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+            + "customized-driver-1.class",
+        "org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
+    conf.set(YarnConfiguration.NM_CSI_DRIVER_PREFIX
+            + "customized-driver-1.endpoint",
+        "unix:///tmp/customized-driver-1.sock");
+
+    // customized-driver-2
+    conf.setSocketAddr(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+        + "customized-driver-2.address", address2);
+    conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+            + "customized-driver-2.class",
+        "org.apache.hadoop.yarn.csi.adaptor.MockCsiAdaptor");
+    conf.set(YarnConfiguration.NM_CSI_DRIVER_PREFIX
+            + "customized-driver-2.endpoint",
+        "unix:///tmp/customized-driver-2.sock");
+
+    driver1Addr.close();
+    driver2Addr.close();
+
+    CsiAdaptorServices services =
+        new CsiAdaptorServices();
+    services.init(conf);
+    services.start();
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    CsiAdaptorProtocol client1 = NMProxy
+        .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+            NetUtils.createSocketAddrForHost("localhost",
+                driver1Addr.getLocalPort()));
+
+    // ***************************************************
+    // Verify talking with customized-driver-1
+    // ***************************************************
+    // Test getPluginInfo
+    GetPluginInfoResponse pluginInfo =
+        client1.getPluginInfo(GetPluginInfoRequest.newInstance());
+    Assert.assertEquals(pluginInfo.getDriverName(), "customized-driver-1");
+    Assert.assertEquals(pluginInfo.getVersion(), "1.0");
+
+    // Test validateVolumeCapacity
+    ValidateVolumeCapabilitiesRequest request =
+        ValidateVolumeCapabilitiesRequestPBImpl
+            .newInstance("driver-1-volume-00001",
+                ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+                    .VolumeCapability(
+                    MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                    ImmutableList.of())), ImmutableMap.of());
+
+    ValidateVolumeCapabilitiesResponse response = client1
+        .validateVolumeCapacity(request);
+    Assert.assertEquals(true, response.isSupported());
+    Assert.assertEquals("verified via MockCsiAdaptor",
+        response.getResponseMessage());
+
+
+    // ***************************************************
+    // Verify talking with customized-driver-2
+    // ***************************************************
+    CsiAdaptorProtocol client2 = NMProxy
+        .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+            NetUtils.createSocketAddrForHost("localhost",
+                driver2Addr.getLocalPort()));
+    GetPluginInfoResponse pluginInfo2 =
+        client2.getPluginInfo(GetPluginInfoRequest.newInstance());
+    Assert.assertEquals(pluginInfo2.getDriverName(), "customized-driver-2");
+    Assert.assertEquals(pluginInfo2.getVersion(), "1.0");
+
+    services.stop();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to