This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new df0460e4161 HBASE-27902 Utility to invoke coproc on multiple servers 
using AsyncAdmin (#5295)
df0460e4161 is described below

commit df0460e416191dab0bd7c2c84094230210b26e74
Author: Jing Yu <y...@salesforce.com>
AuthorDate: Tue Jun 20 17:54:22 2023 -0400

    HBASE-27902 Utility to invoke coproc on multiple servers using AsyncAdmin 
(#5295)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Viraj Jasani <vjas...@apache.org>
---
 .../hadoop/hbase/client/AsyncAdminClientUtils.java |  85 +++++++++++
 ...AsyncCoprocessorOnAllRegionServersEndpoint.java | 167 +++++++++++++++++++++
 2 files changed, 252 insertions(+)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java
new file mode 100644
index 00000000000..82269aabdb9
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.RpcChannel;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Additional Asynchronous Admin capabilities for clients.
+ */
+@InterfaceAudience.Public
+public final class AsyncAdminClientUtils {
+
+  private AsyncAdminClientUtils() {
+  }
+
+  /**
+   * Execute the given coprocessor call on all region servers.
+   * <p>
+   * The {@code stubMaker} is just a delegation to the {@code newStub} call. 
Usually it is only a
+   * one line lambda expression, like:
+   *
+   * <pre>
+   * channel -&gt; xxxService.newStub(channel)
+   * </pre>
+   *
+   * @param asyncAdmin the asynchronous administrative API for HBase.
+   * @param stubMaker  a delegation to the actual {@code newStub} call.
+   * @param callable   a delegation to the actual protobuf rpc call. See the 
comment of
+   *                   {@link ServiceCaller} for more details.
+   * @param <S>        the type of the asynchronous stub
+   * @param <R>        the type of the return value
+   * @return Map of each region server to its result of the protobuf rpc call, 
wrapped by a
+   *         {@link CompletableFuture}.
+   * @see ServiceCaller
+   */
+  public static <S, R> CompletableFuture<Map<ServerName, Object>>
+    coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, 
Function<RpcChannel, S> stubMaker,
+      ServiceCaller<S, R> callable) {
+    CompletableFuture<Map<ServerName, Object>> future = new 
CompletableFuture<>();
+    FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, 
error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
+      for (ServerName regionServer : regionServers) {
+        FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, 
callable, regionServer),
+          (server, err) -> {
+            if (err != null) {
+              resultMap.put(regionServer, err);
+            } else {
+              resultMap.put(regionServer, server);
+            }
+            if (resultMap.size() == regionServers.size()) {
+              future.complete(Collections.unmodifiableMap(resultMap));
+            }
+          });
+      }
+    });
+    return future;
+  }
+}
diff --git 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java
 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java
new file mode 100644
index 00000000000..018c6758802
--- /dev/null
+++ 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.FileNotFoundException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncAdminClientUtils;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.ServiceCaller;
+import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
+import 
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
+import 
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
+import 
org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends 
TestAsyncAdminBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class);
+
+  private static final String THROW_CLASS_NAME = 
"java.io.FileNotFoundException";
+  private static final String DUMMY_VALUE = "val";
+  private static final int NUM_SLAVES = 5;
+  private static final int NUM_SUCCESS_REGION_SERVERS = 3;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
60000);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 
120000);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+      ProtobufCoprocessorService.class.getName());
+    
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+      DummyRegionServerEndpoint.class.getName());
+    TEST_UTIL.startMiniCluster(NUM_SLAVES);
+    ASYNC_CONN = 
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRegionServersCoprocessorService()
+    throws ExecutionException, InterruptedException {
+    DummyRequest request = DummyRequest.getDefaultInstance();
+    Map<ServerName,
+      Object> resultMap = 
AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin,
+        DummyService::newStub, (ServiceCaller<DummyService.Stub, 
DummyResponse>) (stub, controller,
+          rpcCallback) -> stub.dummyCall(controller, request, rpcCallback))
+        .get();
+    resultMap.forEach((k, v) -> {
+      assertTrue(v instanceof DummyResponse);
+      DummyResponse resp = (DummyResponse) v;
+      assertEquals(DUMMY_VALUE, resp.getValue());
+    });
+  }
+
+  @Test
+  public void testRegionServerCoprocessorsServiceAllFail()
+    throws ExecutionException, InterruptedException {
+    DummyRequest request = DummyRequest.getDefaultInstance();
+    Map<ServerName,
+      Object> resultMap = 
AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin,
+        DummyService::newStub, (ServiceCaller<DummyService.Stub, 
DummyResponse>) (stub, controller,
+          rpcCallback) -> stub.dummyThrow(controller, request, rpcCallback))
+        .get();
+
+    resultMap.forEach((k, v) -> {
+      assertTrue(v instanceof RetriesExhaustedException);
+      Throwable e = (Throwable) v;
+      assertTrue(e.getMessage().contains(THROW_CLASS_NAME));
+    });
+  }
+
+  @Test
+  public void testRegionServerCoprocessorsServicePartialFail()
+    throws ExecutionException, InterruptedException {
+    DummyRequest request = DummyRequest.getDefaultInstance();
+    AtomicInteger callCount = new AtomicInteger();
+    Map<ServerName, Object> resultMap =
+      AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, 
DummyService::newStub,
+        (ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller, 
rpcCallback) -> {
+          callCount.addAndGet(1);
+          if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) {
+            stub.dummyCall(controller, request, rpcCallback);
+          } else {
+            stub.dummyThrow(controller, request, rpcCallback);
+          }
+        }).get();
+
+    AtomicInteger successCallCount = new AtomicInteger();
+    resultMap.forEach((k, v) -> {
+      if (v instanceof DummyResponse) {
+        successCallCount.addAndGet(1);
+        DummyResponse resp = (DummyResponse) v;
+        assertEquals(DUMMY_VALUE, resp.getValue());
+      } else {
+        assertTrue(v instanceof RetriesExhaustedException);
+        Throwable e = (Throwable) v;
+        assertTrue(e.getMessage().contains(THROW_CLASS_NAME));
+      }
+    });
+    assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get());
+  }
+
+  public static class DummyRegionServerEndpoint extends DummyService
+    implements RegionServerCoprocessor {
+    @Override
+    public Iterable<Service> getServices() {
+      return Collections.singleton(this);
+    }
+
+    @Override
+    public void dummyCall(RpcController controller, DummyRequest request,
+      RpcCallback<DummyResponse> callback) {
+      callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
+    }
+
+    @Override
+    public void dummyThrow(RpcController controller, DummyRequest request,
+      RpcCallback<DummyResponse> done) {
+      CoprocessorRpcUtils.setControllerException(controller,
+        new FileNotFoundException("/file.txt"));
+    }
+  }
+}

Reply via email to