This is an automated email from the ASF dual-hosted git repository.

macduan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new bdffcaa  Introduce the extraProperties to support user-defined 
pluggable accessCheckers (#42)
bdffcaa is described below

commit bdffcaa58b514f22f2f2da72c8e791cef4ac00f3
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Jul 18 14:47:21 2022 +0800

    Introduce the extraProperties to support user-defined pluggable 
accessCheckers (#42)
    
    ###  What changes were proposed in this pull request?
    Introduce the reservedData to extend more pluggable accessCheckers
    
    ###  Why are the changes needed?
    In current codebase, the accessinfo only have acessid and tags. If we want 
to extend more AccessChecker in coordinator, the info is not enough.
    
    To solve this, i think introducing the reservedData is necessary.
    
    ###  Does this PR introduce any user-facing change?
    No.
    
    ###  How was this patch tested?
    UTs
---
 .../org/apache/uniffle/coordinator/AccessInfo.java | 19 ++++--
 .../coordinator/CoordinatorGrpcService.java        |  7 +-
 .../uniffle/coordinator/AccessManagerTest.java     |  3 +-
 .../org/apache/uniffle/test/AccessClusterTest.java | 76 ++++++++++++++++++++--
 .../client/impl/grpc/CoordinatorGrpcClient.java    |  1 +
 .../client/request/RssAccessClusterRequest.java    | 23 +++++++
 proto/src/main/proto/Rss.proto                     |  1 +
 7 files changed, 119 insertions(+), 11 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
index f8b206f..8200fa7 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -24,14 +26,16 @@ import com.google.common.collect.Sets;
 public class AccessInfo {
   private final String accessId;
   private final Set<String> tags;
+  private final Map<String, String> extraProperties;
 
-  public AccessInfo(String accessId, Set<String> tags) {
+  public AccessInfo(String accessId, Set<String> tags, Map<String, String> 
extraProperties) {
     this.accessId = accessId;
     this.tags = tags;
+    this.extraProperties = extraProperties == null ? Collections.emptyMap() : 
extraProperties;
   }
 
   public AccessInfo(String accessId) {
-    this(accessId, Sets.newHashSet());
+    this(accessId, Sets.newHashSet(), Collections.emptyMap());
   }
 
   public String getAccessId() {
@@ -42,11 +46,16 @@ public class AccessInfo {
     return tags;
   }
 
+  public Map<String, String> getExtraProperties() {
+    return extraProperties;
+  }
+
   @Override
   public String toString() {
     return "AccessInfo{"
-        + "accessId='" + accessId + '\''
-        + ", tags=" + tags
-        + '}';
+            + "accessId='" + accessId + '\''
+            + ", tags=" + tags
+            + ", extraProperties=" + extraProperties
+            + '}';
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 4fcff39..b3bbf72 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -206,7 +206,12 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     AccessClusterResponse response;
     AccessManager accessManager = coordinatorServer.getAccessManager();
 
-    AccessInfo accessInfo = new AccessInfo(request.getAccessId(), 
Sets.newHashSet(request.getTagsList()));
+    AccessInfo accessInfo =
+            new AccessInfo(
+                request.getAccessId(),
+                Sets.newHashSet(request.getTagsList()),
+                request.getExtraPropertiesMap()
+            );
     AccessCheckResult result = accessManager.handleAccessRequest(accessInfo);
     if (!result.isSuccess()) {
       statusCode = StatusCode.ACCESS_DENIED;
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
index bd204e2..145e694 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.Collections;
 import java.util.Random;
 
 import com.google.common.collect.Sets;
@@ -66,7 +67,7 @@ public class AccessManagerTest {
     AccessManager accessManager = new AccessManager(conf, null, new 
Configuration());
     assertTrue(accessManager.handleAccessRequest(
             new AccessInfo(String.valueOf(new Random().nextInt()),
-                Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION)))
+                Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 
Collections.emptyMap()))
         .isSuccess());
     accessManager.close();
     // test mock checkers
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 6d89300..149ab88 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -18,13 +18,19 @@
 package org.apache.uniffle.test;
 
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.factory.CoordinatorClientFactory;
@@ -32,16 +38,78 @@ import 
org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.coordinator.AccessCheckResult;
+import org.apache.uniffle.coordinator.AccessChecker;
+import org.apache.uniffle.coordinator.AccessInfo;
+import org.apache.uniffle.coordinator.AccessManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
-import org.junit.jupiter.api.io.TempDir;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class AccessClusterTest extends CoordinatorTestBase {
 
+  public static class MockedAccessChecker implements AccessChecker {
+    final String key = "key";
+    final List<String> legalNames = Arrays.asList("v1", "v2", "v3");
+
+    public MockedAccessChecker(AccessManager accessManager) throws Exception {
+      // ignore
+    }
+
+    @Override
+    public AccessCheckResult check(AccessInfo accessInfo) {
+      Map<String, String> reservedData = accessInfo.getExtraProperties();
+      if (legalNames.contains(reservedData.get(key))) {
+        return new AccessCheckResult(true, "");
+      }
+      return new AccessCheckResult(false, "");
+    }
+
+    @Override
+    public void close() throws IOException {
+      // ignore.
+    }
+  }
+
+  @Test
+  public void testUsingCustomExtraProperties() throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    coordinatorConf.setString(
+            "rss.coordinator.access.checkers",
+            "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker");
+    createCoordinatorServer(coordinatorConf);
+    startServers();
+    Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+
+    // case1: empty map
+    String accessID = "acessid";
+    RssAccessClusterRequest request = new RssAccessClusterRequest(
+            accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000);
+    RssAccessClusterResponse response = 
coordinatorClient.accessCluster(request);
+    assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
+
+    // case2: illegal names
+    Map<String, String> extraProperties = new HashMap<>();
+    extraProperties.put("key", "illegalName");
+    request = new RssAccessClusterRequest(
+            accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, 
extraProperties);
+    response = coordinatorClient.accessCluster(request);
+    assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode());
+
+    // case3: legal names
+    extraProperties.clear();
+    extraProperties.put("key", "v1");
+    request = new RssAccessClusterRequest(
+            accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, 
extraProperties);
+    response = coordinatorClient.accessCluster(request);
+    assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
+
+    shutdownServers();
+  }
+
   @Test
   public void test(@TempDir File tempDir) throws Exception {
     File cfgFile = File.createTempFile("tmp", ".conf", tempDir);
@@ -57,14 +125,13 @@ public class AccessClusterTest extends CoordinatorTestBase 
{
     
coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold",
 2);
     coordinatorConf.setString("rss.coordinator.access.candidates.path", 
cfgFile.getAbsolutePath());
     coordinatorConf.setString(
-        "rss.coordinator.access.checkers",
-        
"org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
+            "rss.coordinator.access.checkers",
+            
"org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker");
     createCoordinatorServer(coordinatorConf);
 
     ShuffleServerConf shuffleServerConf = getShuffleServerConf();
     createShuffleServer(shuffleServerConf);
     startServers();
-
     Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
     String accessId = "111111";
     RssAccessClusterRequest request = new RssAccessClusterRequest(
@@ -100,6 +167,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
     assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
     assertTrue(response.getMessage().startsWith("SUCCESS"));
     shuffleServer.stopServer();
+    shutdownServers();
   }
 }
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 219dadb..41b2a86 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -250,6 +250,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
         .newBuilder()
         .setAccessId(request.getAccessId())
         .addAllTags(request.getTags())
+        .putAllExtraProperties(request.getExtraProperties())
         .build();
     AccessClusterResponse rpcResponse;
     try {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
index adf21f5..ac5523a 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java
@@ -17,6 +17,8 @@
 
 package org.apache.uniffle.client.request;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 
 public class RssAccessClusterRequest {
@@ -24,11 +26,28 @@ public class RssAccessClusterRequest {
   private final String accessId;
   private final Set<String> tags;
   private final int timeoutMs;
+  /**
+   * The map is to pass the extra data to the coordinator and to
+   * extend more pluggable {@code AccessCheckers} easily.
+   */
+  private final Map<String, String> extraProperties;
 
   public RssAccessClusterRequest(String accessId, Set<String> tags, int 
timeoutMs) {
     this.accessId = accessId;
     this.tags = tags;
     this.timeoutMs = timeoutMs;
+    this.extraProperties = Collections.emptyMap();
+  }
+
+  public RssAccessClusterRequest(
+      String accessId,
+      Set<String> tags,
+      int timeoutMs,
+      Map<String, String> extraProperties) {
+    this.accessId = accessId;
+    this.tags = tags;
+    this.timeoutMs = timeoutMs;
+    this.extraProperties = extraProperties;
   }
 
   public String getAccessId() {
@@ -42,4 +61,8 @@ public class RssAccessClusterRequest {
   public int getTimeoutMs() {
     return timeoutMs;
   }
+
+  public Map<String, String> getExtraProperties() {
+    return extraProperties;
+  }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 5e16fdd..491316d 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -335,6 +335,7 @@ message CheckServiceAvailableResponse {
 message AccessClusterRequest {
   string accessId = 1;
   repeated string tags = 2;
+  map<string, string> extraProperties = 3;
 }
 
 message AccessClusterResponse {

Reply via email to