This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 83ea0da77a3 HBASE-27657 Connection and Request Attributes (#5326)
83ea0da77a3 is described below

commit 83ea0da77a344ae84adb5426c346350a3004bcad
Author: Ray Mattingly <rmdmattin...@gmail.com>
AuthorDate: Mon Jul 24 12:53:33 2023 -0400

    HBASE-27657 Connection and Request Attributes (#5326)
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../client/AsyncAdminRequestRetryingCaller.java    |   3 +-
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  |   7 +-
 .../hadoop/hbase/client/AsyncClientScanner.java    |  31 +-
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  11 +-
 .../AsyncMasterRequestRpcRetryingCaller.java       |   3 +-
 .../hbase/client/AsyncRpcRetryingCaller.java       |   4 +-
 .../client/AsyncRpcRetryingCallerFactory.java      |  31 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |   4 +-
 .../AsyncServerRequestRpcRetryingCaller.java       |   3 +-
 .../AsyncSingleRequestRpcRetryingCaller.java       |   6 +-
 .../org/apache/hadoop/hbase/client/AsyncTable.java |  10 +
 .../hadoop/hbase/client/AsyncTableBuilder.java     |   5 +
 .../hadoop/hbase/client/AsyncTableBuilderBase.java |  14 +
 .../apache/hadoop/hbase/client/AsyncTableImpl.java |   6 +
 .../hadoop/hbase/client/ConnectionFactory.java     |  67 ++++-
 .../client/ConnectionOverAsyncConnection.java      |   7 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  17 +-
 .../java/org/apache/hadoop/hbase/client/Table.java |   8 +
 .../apache/hadoop/hbase/client/TableBuilder.java   |   5 +
 .../hadoop/hbase/client/TableBuilderBase.java      |  14 +
 .../hadoop/hbase/client/TableOverAsyncTable.java   |   5 +
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  36 +--
 .../apache/hadoop/hbase/ipc/BlockingRpcClient.java |   8 +-
 .../hadoop/hbase/ipc/BlockingRpcConnection.java    |   2 +-
 .../java/org/apache/hadoop/hbase/ipc/Call.java     |   7 +-
 .../hbase/ipc/DelegatingHBaseRpcController.java    |  11 +
 .../hadoop/hbase/ipc/HBaseRpcController.java       |  11 +
 .../hadoop/hbase/ipc/HBaseRpcControllerImpl.java   |  14 +
 .../java/org/apache/hadoop/hbase/ipc/IPCUtil.java  |  11 +
 .../apache/hadoop/hbase/ipc/NettyRpcClient.java    |  11 +-
 .../hadoop/hbase/ipc/NettyRpcConnection.java       |   2 +-
 .../apache/hadoop/hbase/ipc/RpcClientFactory.java  |  13 +-
 .../org/apache/hadoop/hbase/ipc/RpcConnection.java |  15 +-
 .../client/TestRpcBasedRegistryHedgedReads.java    |   3 +-
 .../hadoop/hbase/ipc/TestTLSHandshadeFailure.java  |   4 +-
 .../hbase/mapreduce/TestHFileOutputFormat2.java    |   7 +-
 .../mapreduce/TestMultiTableInputFormatBase.java   |   3 +-
 .../hbase/mapreduce/TestTableInputFormatBase.java  |   4 +-
 .../src/main/protobuf/rpc/RPC.proto                |   2 +
 .../hbase/client/AsyncClusterConnectionImpl.java   |   3 +-
 .../AsyncRegionReplicationRetryingCaller.java      |   4 +-
 .../java/org/apache/hadoop/hbase/ipc/RpcCall.java  |   3 +
 .../org/apache/hadoop/hbase/ipc/ServerCall.java    |   6 +
 .../hadoop/hbase/client/DummyAsyncTable.java       |   6 +
 .../hadoop/hbase/client/TestClientTimeouts.java    |   5 +-
 .../client/TestRequestAndConnectionAttributes.java | 317 +++++++++++++++++++++
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java       |   5 +-
 .../ipc/TestRpcServerSlowConnectionSetup.java      |   2 +-
 .../hbase/namequeues/TestNamedQueueRecorder.java   |   7 +-
 .../store/region/TestRegionProcedureStore.java     |   6 +
 .../hbase/thrift2/client/ThriftConnection.java     |   9 +-
 51 files changed, 712 insertions(+), 86 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index d3bec8b3cfb..f7fa7e9f03f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -44,7 +45,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends 
AsyncRpcRetryingCaller<T
     long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long 
operationTimeoutNs,
     long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, 
Callable<T> callable) {
     super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, 
maxAttempts,
-      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 
Collections.emptyMap());
     this.serverName = serverName;
     this.callable = callable;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 7a8bbeb9420..c485a0a2c05 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -114,6 +114,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private final HBaseServerExceptionPauseManager pauseManager;
 
+  private final Map<String, byte[]> requestAttributes;
+
   // we can not use HRegionLocation as the map key because the hashCode and 
equals method of
   // HRegionLocation only consider serverName.
   private static final class RegionRequest {
@@ -149,7 +151,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl 
conn,
     TableName tableName, List<? extends Row> actions, long pauseNs, long 
pauseNsForServerOverloaded,
-    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
+    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt,
+    Map<String, byte[]> requestAttributes) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.tableName = tableName;
@@ -180,6 +183,7 @@ class AsyncBatchRpcRetryingCaller<T> {
     this.startNs = System.nanoTime();
     this.pauseManager =
       new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded, operationTimeoutNs);
+    this.requestAttributes = requestAttributes;
   }
 
   private static boolean hasIncrementOrAppend(Row action) {
@@ -392,6 +396,7 @@ class AsyncBatchRpcRetryingCaller<T> {
     HBaseRpcController controller = conn.rpcControllerFactory.newController();
     resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
       calcPriority(serverReq.getPriority(), tableName));
+    controller.setRequestAttributes(requestAttributes);
     if (!cells.isEmpty()) {
       controller.setCellScanner(createCellScanner(cells));
     }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index ed381df7e0d..b61f5b80c9e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -32,6 +32,7 @@ import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.StatusCode;
 import io.opentelemetry.context.Scope;
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -92,9 +93,12 @@ class AsyncClientScanner {
 
   private final Span span;
 
+  private final Map<String, byte[]> requestAttributes;
+
   public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, 
TableName tableName,
     AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long 
pauseNsForServerOverloaded,
-    int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
+    int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt,
+    Map<String, byte[]> requestAttributes) {
     if (scan.getStartRow() == null) {
       scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
     }
@@ -113,6 +117,7 @@ class AsyncClientScanner {
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;
     this.resultCache = createScanResultCache(scan);
+    this.requestAttributes = requestAttributes;
     if (scan.isScanMetricsEnabled()) {
       this.scanMetrics = new ScanMetrics();
       consumer.onScanMetricsCreated(scanMetrics);
@@ -191,15 +196,17 @@ class AsyncClientScanner {
   }
 
   private void startScan(OpenScannerResponse resp) {
-    
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
-      .location(resp.loc).remote(resp.isRegionServerRemote)
-      .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), 
TimeUnit.MILLISECONDS).stub(resp.stub)
-      
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
-      .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
-      .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
-      .pauseForServerOverloaded(pauseNsForServerOverloaded, 
TimeUnit.NANOSECONDS)
-      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
-      .start(resp.controller, resp.resp), (hasMore, error) -> {
+    addListener(
+      
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+        .remote(resp.isRegionServerRemote)
+        .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), 
TimeUnit.MILLISECONDS).stub(resp.stub)
+        
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
+        .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+        .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
+        .pauseForServerOverloaded(pauseNsForServerOverloaded, 
TimeUnit.NANOSECONDS)
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
+        .setRequestAttributes(requestAttributes).start(resp.controller, 
resp.resp),
+      (hasMore, error) -> {
         try (Scope ignored = span.makeCurrent()) {
           if (error != null) {
             try {
@@ -231,8 +238,8 @@ class AsyncClientScanner {
         .priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, 
TimeUnit.NANOSECONDS)
         .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
         .pauseForServerOverloaded(pauseNsForServerOverloaded, 
TimeUnit.NANOSECONDS)
-        
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
-        .call();
+        .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
+        
.setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
     }
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 3af574cfc0b..4900581c69a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -31,6 +31,8 @@ import static 
org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import io.opentelemetry.api.trace.Span;
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -127,6 +129,11 @@ public class AsyncConnectionImpl implements 
AsyncConnection {
 
   public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, 
String clusterId,
     SocketAddress localAddress, User user) {
+    this(conf, registry, clusterId, localAddress, user, 
Collections.emptyMap());
+  }
+
+  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, 
String clusterId,
+    SocketAddress localAddress, User user, Map<String, byte[]> 
connectionAttributes) {
     this.conf = conf;
     this.user = user;
     this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
@@ -142,8 +149,8 @@ public class AsyncConnectionImpl implements AsyncConnection 
{
     } else {
       this.metrics = Optional.empty();
     }
-    this.rpcClient =
-      RpcClientFactory.createClient(conf, clusterId, localAddress, 
metrics.orElse(null));
+    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, 
localAddress,
+      metrics.orElse(null), connectionAttributes);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.rpcTimeout =
       (int) Math.min(Integer.MAX_VALUE, 
TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index c02b80c666a..42585ea1c91 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -47,7 +48,7 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCall
     Callable<T> callable, int priority, long pauseNs, long 
pauseNsForServerOverloaded,
     int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
     super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, 
maxRetries,
-      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 
Collections.emptyMap());
     this.callable = callable;
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 8b317bfec2c..c3dd8740854 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -22,6 +22,7 @@ import static 
org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
@@ -78,7 +79,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
 
   public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, 
int priority,
     long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long 
operationTimeoutNs,
-    long rpcTimeoutNs, int startLogErrorsCnt) {
+    long rpcTimeoutNs, int startLogErrorsCnt, Map<String, byte[]> 
requestAttributes) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.priority = priority;
@@ -89,6 +90,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
     this.future = new CompletableFuture<>();
     this.controller = conn.rpcControllerFactory.newController();
     this.controller.setPriority(priority);
+    this.controller.setRequestAttributes(requestAttributes);
     this.exceptions = new ArrayList<>();
     this.startNs = System.nanoTime();
     this.pauseManager =
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 2d8e7b7aabe..1ea2a1ad7dd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -23,7 +23,9 @@ import static 
org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -83,6 +85,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private int priority = PRIORITY_UNSET;
 
+    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
+
     public SingleRequestCallerBuilder<T> table(TableName tableName) {
       this.tableName = tableName;
       return this;
@@ -144,6 +148,12 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public SingleRequestCallerBuilder<T>
+      setRequestAttributes(Map<String, byte[]> requestAttributes) {
+      this.requestAttributes = requestAttributes;
+      return this;
+    }
+
     private void preCheck() {
       checkArgument(replicaId >= 0, "invalid replica id %s", replicaId);
       checkNotNull(tableName, "tableName is null");
@@ -157,7 +167,7 @@ class AsyncRpcRetryingCallerFactory {
       preCheck();
       return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, 
tableName, row, replicaId,
         locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, 
maxAttempts,
-        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 
requestAttributes);
     }
 
     /**
@@ -201,6 +211,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private int priority = PRIORITY_UNSET;
 
+    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
+
     public ScanSingleRegionCallerBuilder id(long scannerId) {
       this.scannerId = scannerId;
       return this;
@@ -278,6 +290,12 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public ScanSingleRegionCallerBuilder
+      setRequestAttributes(Map<String, byte[]> requestAttributes) {
+      this.requestAttributes = requestAttributes;
+      return this;
+    }
+
     private void preCheck() {
       checkArgument(scannerId != null, "invalid scannerId %d", scannerId);
       checkNotNull(scan, "scan is null");
@@ -293,7 +311,7 @@ class AsyncRpcRetryingCallerFactory {
       return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, 
scan, scanMetrics,
         scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, 
priority,
         scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, 
maxAttempts,
-        scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+        scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
     }
 
     /**
@@ -322,6 +340,8 @@ class AsyncRpcRetryingCallerFactory {
 
     private long rpcTimeoutNs = -1L;
 
+    private Map<String, byte[]> requestAttributes = Collections.emptyMap();
+
     public BatchCallerBuilder table(TableName tableName) {
       this.tableName = tableName;
       return this;
@@ -362,10 +382,15 @@ class AsyncRpcRetryingCallerFactory {
       return this;
     }
 
+    public BatchCallerBuilder setRequestAttributes(Map<String, byte[]> 
requestAttributes) {
+      this.requestAttributes = requestAttributes;
+      return this;
+    }
+
     public <T> AsyncBatchRpcRetryingCaller<T> build() {
       return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, 
actions, pauseNs,
         pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, 
rpcTimeoutNs,
-        startLogErrorsCnt);
+        startLogErrorsCnt, requestAttributes);
     }
 
     public <T> List<CompletableFuture<T>> call() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index ca39051de84..a5d4ef6407e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -31,6 +31,7 @@ import io.opentelemetry.context.Scope;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
@@ -316,7 +317,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
     boolean isRegionServerRemote, int priority, long 
scannerLeaseTimeoutPeriodNs, long pauseNs,
     long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long 
rpcTimeoutNs,
-    int startLogErrorsCnt) {
+    int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
     this.retryTimer = retryTimer;
     this.conn = conn;
     this.scan = scan;
@@ -341,6 +342,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.priority = priority;
     this.controller = conn.rpcControllerFactory.newController();
     this.controller.setPriority(priority);
+    this.controller.setRequestAttributes(requestAttributes);
     this.exceptions = new ArrayList<>();
     this.pauseManager =
       new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded, scanTimeoutNs);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 40cd3b87e92..d4484ba87bf 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -49,7 +50,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCall
     long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long 
operationTimeoutNs,
     long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, 
Callable<T> callable) {
     super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, 
pauseNsForServerOverloaded, maxAttempts,
-      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, 
Collections.emptyMap());
     this.serverName = serverName;
     this.callable = callable;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 9c115af97b5..a0d536aef5f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -57,9 +58,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends 
AsyncRpcRetryingCaller<T> {
   public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, 
AsyncConnectionImpl conn,
     TableName tableName, byte[] row, int replicaId, RegionLocateType 
locateType,
     Callable<T> callable, int priority, long pauseNs, long 
pauseNsForServerOverloaded,
-    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt) {
+    int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt,
+    Map<String, byte[]> requestAttributes) {
     super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, 
maxAttempts,
-      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+      operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
     this.tableName = tableName;
     this.row = row;
     this.replicaId = replicaId;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 3c03444cfbb..2979c668988 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -22,9 +22,11 @@ import static 
org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnl
 import static org.apache.hadoop.hbase.util.FutureUtils.allOf;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.TableName;
@@ -110,6 +112,14 @@ public interface AsyncTable<C extends 
ScanResultConsumerBase> {
    */
   long getScanTimeout(TimeUnit unit);
 
+  /**
+   * Get the map of request attributes
+   * @return a map of request attributes supplied by the client
+   */
+  default Map<String, byte[]> getRequestAttributes() {
+    throw new NotImplementedException("Add an implementation!");
+  }
+
   /**
    * Test for the existence of columns in the table, as specified by the Get.
    * <p>
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
index f6db89f82bf..007f7ad4868 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -137,6 +137,11 @@ public interface AsyncTableBuilder<C extends 
ScanResultConsumerBase> {
    */
   AsyncTableBuilder<C> setStartLogErrorsCnt(int startLogErrorsCnt);
 
+  /**
+   * Set a request attribute
+   */
+  AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value);
+
   /**
    * Create the {@link AsyncTable} instance.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index 624d6e1dbb0..02e9da0770b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -50,6 +53,8 @@ abstract class AsyncTableBuilderBase<C extends 
ScanResultConsumerBase>
 
   protected int startLogErrorsCnt;
 
+  protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
+
   AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration 
connConf) {
     this.tableName = tableName;
     this.operationTimeoutNs = tableName.isSystemTable()
@@ -121,4 +126,13 @@ abstract class AsyncTableBuilderBase<C extends 
ScanResultConsumerBase>
     this.startLogErrorsCnt = startLogErrorsCnt;
     return this;
   }
+
+  @Override
+  public AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value) {
+    if (this.requestAttributes.isEmpty()) {
+      this.requestAttributes = new HashMap<>();
+    }
+    this.requestAttributes.put(key, value);
+    return this;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index e785e587ab3..590ee9bc47a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -24,6 +24,7 @@ import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -101,6 +102,11 @@ class AsyncTableImpl implements 
AsyncTable<ScanResultConsumer> {
     return rawTable.getScanTimeout(unit);
   }
 
+  @Override
+  public Map<String, byte[]> getRequestAttributes() {
+    return rawTable.getRequestAttributes();
+  }
+
   private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
     return FutureUtils.wrapFuture(future, pool);
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index 4d4559f4b7a..ac70091dcf6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -22,6 +22,8 @@ import static 
org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.hadoop.conf.Configuration;
@@ -216,21 +218,53 @@ public class ConnectionFactory {
    */
   public static Connection createConnection(Configuration conf, 
ExecutorService pool,
     final User user) throws IOException {
+    return createConnection(conf, pool, user, Collections.emptyMap());
+  }
+
+  /**
+   * Create a new Connection instance using the passed <code>conf</code> 
instance. Connection
+   * encapsulates all housekeeping for a connection to the cluster. All tables 
and interfaces
+   * created from returned connection share zookeeper connection, meta cache, 
and connections to
+   * region servers and masters. <br>
+   * The caller is responsible for calling {@link Connection#close()} on the 
returned connection
+   * instance. Typical usage:
+   *
+   * <pre>
+   * Connection connection = ConnectionFactory.createConnection(conf);
+   * Table table = connection.getTable(TableName.valueOf("table1"));
+   * try {
+   *   table.get(...);
+   *   ...
+   * } finally {
+   *   table.close();
+   *   connection.close();
+   * }
+   * </pre>
+   *
+   * @param conf                 configuration
+   * @param user                 the user the connection is for
+   * @param pool                 the thread pool to use for batch operations
+   * @param connectionAttributes attributes to be sent along to server during 
connection establish
+   * @return Connection object for <code>conf</code>
+   */
+  public static Connection createConnection(Configuration conf, 
ExecutorService pool,
+    final User user, Map<String, byte[]> connectionAttributes) throws 
IOException {
     Class<?> clazz = 
conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
       ConnectionOverAsyncConnection.class, Connection.class);
     if (clazz != ConnectionOverAsyncConnection.class) {
       try {
         // Default HCM#HCI is not accessible; make it so before invoking.
-        Constructor<?> constructor =
-          clazz.getDeclaredConstructor(Configuration.class, 
ExecutorService.class, User.class);
+        Constructor<?> constructor = 
clazz.getDeclaredConstructor(Configuration.class,
+          ExecutorService.class, User.class, Map.class);
         constructor.setAccessible(true);
-        return user.runAs((PrivilegedExceptionAction<
-          Connection>) () -> (Connection) constructor.newInstance(conf, pool, 
user));
+        return user.runAs((PrivilegedExceptionAction<Connection>) () -> 
(Connection) constructor
+          .newInstance(conf, pool, user, connectionAttributes));
       } catch (Exception e) {
         throw new IOException(e);
       }
     } else {
-      return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
+      return FutureUtils.get(createAsyncConnection(conf, user, 
connectionAttributes))
+        .toConnection();
     }
   }
 
@@ -281,6 +315,27 @@ public class ConnectionFactory {
    */
   public static CompletableFuture<AsyncConnection> 
createAsyncConnection(Configuration conf,
     final User user) {
+    return createAsyncConnection(conf, user, null);
+  }
+
+  /**
+   * Create a new AsyncConnection instance using the passed {@code conf} and 
{@code user}.
+   * AsyncConnection encapsulates all housekeeping for a connection to the 
cluster. All tables and
+   * interfaces created from returned connection share zookeeper connection, 
meta cache, and
+   * connections to region servers and masters.
+   * <p>
+   * The caller is responsible for calling {@link AsyncConnection#close()} on 
the returned
+   * connection instance.
+   * <p>
+   * Usually you should only create one AsyncConnection instance in your code 
and use it everywhere
+   * as it is thread safe.
+   * @param conf                 configuration
+   * @param user                 the user the asynchronous connection is for
+   * @param connectionAttributes attributes to be sent along to server during 
connection establish
+   * @return AsyncConnection object wrapped by CompletableFuture
+   */
+  public static CompletableFuture<AsyncConnection> 
createAsyncConnection(Configuration conf,
+    final User user, Map<String, byte[]> connectionAttributes) {
     return TraceUtil.tracedFuture(() -> {
       CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
       ConnectionRegistry registry = 
ConnectionRegistryFactory.getRegistry(conf);
@@ -300,7 +355,7 @@ public class ConnectionFactory {
         try {
           future.complete(
             user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) 
() -> ReflectionUtils
-              .newInstance(clazz, conf, registry, clusterId, null, user)));
+              .newInstance(clazz, conf, registry, clusterId, null, user, 
connectionAttributes)));
         } catch (Exception e) {
           registry.close();
           future.completeExceptionally(e);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index 7a7b38a4df6..51368fc23c1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -189,12 +189,13 @@ class ConnectionOverAsyncConnection implements Connection 
{
       public Table build() {
         IOExceptionSupplier<ExecutorService> poolSupplier =
           pool != null ? () -> pool : 
ConnectionOverAsyncConnection.this::getBatchPool;
-        return new TableOverAsyncTable(conn,
+        AsyncTableBuilder<AdvancedScanResultConsumer> tableBuilder =
           conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, 
TimeUnit.MILLISECONDS)
             .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
             .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
-            .setOperationTimeout(operationTimeout, 
TimeUnit.MILLISECONDS).build(),
-          poolSupplier);
+            .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS);
+        requestAttributes.forEach(tableBuilder::setRequestAttribute);
+        return new TableOverAsyncTable(conn, tableBuilder.build(), 
poolSupplier);
       }
     };
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index ff75c0725ce..342cf89acf1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -119,6 +120,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
   private final int startLogErrorsCnt;
 
+  private final Map<String, byte[]> requestAttributes;
+
   RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, 
AsyncTableBuilderBase<?> builder) {
     this.conn = conn;
     this.retryTimer = retryTimer;
@@ -145,6 +148,7 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
       ? conn.connConf.getMetaScannerCaching()
       : conn.connConf.getScannerCaching();
     this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
+    this.requestAttributes = builder.requestAttributes;
   }
 
   @Override
@@ -210,7 +214,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
       .pause(pauseNs, TimeUnit.NANOSECONDS)
       .pauseForServerOverloaded(pauseNsForServerOverloaded, 
TimeUnit.NANOSECONDS)
-      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
+      .maxAttempts(maxAttempts).setRequestAttributes(requestAttributes)
+      
.startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes);
   }
 
   private <T, R extends OperationWithAttributes & Row> 
SingleRequestCallerBuilder<T>
@@ -608,7 +613,7 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
     new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, 
conn, retryTimer,
       pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, 
readRpcTimeoutNs,
-      startLogErrorsCnt).start();
+      startLogErrorsCnt, requestAttributes).start();
   }
 
   private long resultSize2CacheSize(long maxResultSize) {
@@ -704,7 +709,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
       .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
       .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, 
TimeUnit.NANOSECONDS)
       .pauseForServerOverloaded(pauseNsForServerOverloaded, 
TimeUnit.NANOSECONDS)
-      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+      .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
+      .setRequestAttributes(requestAttributes).call();
   }
 
   @Override
@@ -732,6 +738,11 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
     return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
   }
 
+  @Override
+  public Map<String, byte[]> getRequestAttributes() {
+    return requestAttributes;
+  }
+
   private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, 
S> stubMaker,
     ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
     RegionCoprocessorRpcChannelImpl channel = new 
RegionCoprocessorRpcChannelImpl(conn, tableName,
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 7feefc831ca..3941c0d1854 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -751,4 +751,12 @@ public interface Table extends Closeable {
   default long getOperationTimeout(TimeUnit unit) {
     throw new NotImplementedException("Add an implementation!");
   }
+
+  /**
+   * Get the attributes to be submitted with requests
+   * @return map of request attributes
+   */
+  default Map<String, byte[]> getRequestAttributes() {
+    throw new NotImplementedException("Add an implementation!");
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java
index 75e16e89a5d..eee985555b3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilder.java
@@ -55,6 +55,11 @@ public interface TableBuilder {
    */
   TableBuilder setWriteRpcTimeout(int timeout);
 
+  /**
+   * Set a request attribute
+   */
+  TableBuilder setRequestAttribute(String key, byte[] value);
+
   /**
    * Create the {@link Table} instance.
    */
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
index c74340259f3..dc3111b0c79 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -36,6 +39,8 @@ abstract class TableBuilderBase implements TableBuilder {
 
   protected int writeRpcTimeout;
 
+  protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
+
   TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
     if (tableName == null) {
       throw new IllegalArgumentException("Given table name is null");
@@ -73,4 +78,13 @@ abstract class TableBuilderBase implements TableBuilder {
     this.writeRpcTimeout = timeout;
     return this;
   }
+
+  @Override
+  public TableBuilderBase setRequestAttribute(String key, byte[] value) {
+    if (this.requestAttributes.isEmpty()) {
+      this.requestAttributes = new HashMap<>();
+    }
+    this.requestAttributes.put(key, value);
+    return this;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index e1565f18159..0a7dabd476c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -560,6 +560,11 @@ class TableOverAsyncTable implements Table {
     return table.getOperationTimeout(unit);
   }
 
+  @Override
+  public Map<String, byte[]> getRequestAttributes() {
+    return table.getRequestAttributes();
+  }
+
   @Override
   public RegionLocator getRegionLocator() throws IOException {
     return conn.toConnection().getRegionLocator(getName());
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 23d14c272d2..5e42558671b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -26,6 +26,7 @@ import io.opentelemetry.context.Scope;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -106,6 +107,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
   private boolean running = true; // if client runs
 
   protected final Configuration conf;
+  protected final Map<String, byte[]> connectionAttributes;
   protected final String clusterId;
   protected final SocketAddress localAddr;
   protected final MetricsConnection metrics;
@@ -154,7 +156,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
    * @param metrics   the connection metrics
    */
   public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress 
localAddr,
-    MetricsConnection metrics) {
+    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
     this.userProvider = UserProvider.instantiate(conf);
     this.localAddr = localAddr;
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
@@ -167,6 +169,7 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
 
     this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
     this.conf = conf;
+    this.connectionAttributes = connectionAttributes;
     this.codec = getCodec();
     this.compressor = getCompressor(conf);
     this.fallbackAllowed = 
conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
@@ -416,23 +419,24 @@ public abstract class AbstractRpcClient<T extends 
RpcConnection> implements RpcC
       }
 
       final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
-      Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), 
returnType,
-        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
-          @Override
-          public void run(Call call) {
-            try (Scope scope = call.span.makeCurrent()) {
-              counter.decrementAndGet();
-              onCallFinished(call, hrc, addr, callback);
-            } finally {
-              if (hrc.failed()) {
-                TraceUtil.setError(span, hrc.getFailed());
-              } else {
-                span.setStatus(StatusCode.OK);
+      Call call =
+        new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, 
hrc.getCallTimeout(),
+          hrc.getPriority(), hrc.getRequestAttributes(), new 
RpcCallback<Call>() {
+            @Override
+            public void run(Call call) {
+              try (Scope scope = call.span.makeCurrent()) {
+                counter.decrementAndGet();
+                onCallFinished(call, hrc, addr, callback);
+              } finally {
+                if (hrc.failed()) {
+                  TraceUtil.setError(span, hrc.getFailed());
+                } else {
+                  span.setStatus(StatusCode.OK);
+                }
+                span.end();
               }
-              span.end();
             }
-          }
-        }, cs);
+          }, cs);
       ConnectionId remoteId = new ConnectionId(ticket, 
md.getService().getName(), addr);
       int count = counter.incrementAndGet();
       try {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
index 7fffdad935f..3da00c5395d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Map;
 import javax.net.SocketFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -41,7 +43,7 @@ public class BlockingRpcClient extends 
AbstractRpcClient<BlockingRpcConnection>
    * SocketFactory
    */
   BlockingRpcClient(Configuration conf) {
-    this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
+    this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null, 
Collections.emptyMap());
   }
 
   /**
@@ -53,8 +55,8 @@ public class BlockingRpcClient extends 
AbstractRpcClient<BlockingRpcConnection>
    * @param metrics   the connection metrics
    */
   public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress 
localAddr,
-    MetricsConnection metrics) {
-    super(conf, clusterId, localAddr, metrics);
+    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
+    super(conf, clusterId, localAddr, metrics, connectionAttributes);
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index d63d14940e7..81ad4d2f056 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -219,7 +219,7 @@ class BlockingRpcConnection extends RpcConnection 
implements Runnable {
   BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) 
throws IOException {
     super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, 
rpcClient.clusterId,
       rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, 
rpcClient.compressor,
-      rpcClient.metrics);
+      rpcClient.metrics, rpcClient.connectionAttributes);
     this.rpcClient = rpcClient;
     this.connectionHeaderPreamble = getConnectionHeaderPreamble();
     ConnectionHeader header = getConnectionHeader();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index 3c0e24e5714..669fc73a3bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import io.opentelemetry.api.trace.Span;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -56,14 +57,15 @@ class Call {
   final Descriptors.MethodDescriptor md;
   final int timeout; // timeout in millisecond for this call; 0 means infinite.
   final int priority;
+  final Map<String, byte[]> attributes;
   final MetricsConnection.CallStats callStats;
   private final RpcCallback<Call> callback;
   final Span span;
   Timeout timeoutTask;
 
   Call(int id, final Descriptors.MethodDescriptor md, Message param, final 
CellScanner cells,
-    final Message responseDefaultType, int timeout, int priority, 
RpcCallback<Call> callback,
-    MetricsConnection.CallStats callStats) {
+    final Message responseDefaultType, int timeout, int priority, Map<String, 
byte[]> attributes,
+    RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
     this.param = param;
     this.md = md;
     this.cells = cells;
@@ -73,6 +75,7 @@ class Call {
     this.id = id;
     this.timeout = timeout;
     this.priority = priority;
+    this.attributes = attributes;
     this.callback = callback;
     this.span = Span.current();
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
index 9bee88d599f..c752f4c1835 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
+import java.util.Map;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -112,6 +113,16 @@ public class DelegatingHBaseRpcController implements 
HBaseRpcController {
     return delegate.hasCallTimeout();
   }
 
+  @Override
+  public Map<String, byte[]> getRequestAttributes() {
+    return delegate.getRequestAttributes();
+  }
+
+  @Override
+  public void setRequestAttributes(Map<String, byte[]> requestAttributes) {
+    delegate.setRequestAttributes(requestAttributes);
+  }
+
   @Override
   public void setFailed(IOException e) {
     delegate.setFailed(e);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index c60de7658f3..cd303a5eda7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
+import java.util.Map;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -71,6 +72,16 @@ public interface HBaseRpcController extends RpcController, 
CellScannable {
 
   boolean hasCallTimeout();
 
+  /**
+   * Get the map of request attributes
+   */
+  Map<String, byte[]> getRequestAttributes();
+
+  /**
+   * Set the map of request attributes
+   */
+  void setRequestAttributes(Map<String, byte[]> requestAttributes);
+
   /**
    * Set failed with an exception to pass on. For use in async rpc clients
    * @param e exception to set with
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index 99ed5c4d48b..425c5e77afc 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -70,6 +72,8 @@ public class HBaseRpcControllerImpl implements 
HBaseRpcController {
    */
   private CellScanner cellScanner;
 
+  private Map<String, byte[]> requestAttributes = Collections.emptyMap();
+
   public HBaseRpcControllerImpl() {
     this(null, (CellScanner) null);
   }
@@ -166,6 +170,16 @@ public class HBaseRpcControllerImpl implements 
HBaseRpcController {
     return callTimeout != null;
   }
 
+  @Override
+  public Map<String, byte[]> getRequestAttributes() {
+    return requestAttributes;
+  }
+
+  @Override
+  public void setRequestAttributes(Map<String, byte[]> requestAttributes) {
+    this.requestAttributes = requestAttributes;
+  }
+
   @Override
   public synchronized String errorText() {
     if (!done || exception == null) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index b509dcbd27b..d6df6c974cc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -44,10 +45,12 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -126,6 +129,14 @@ class IPCUtil {
     if (call.priority != HConstants.PRIORITY_UNSET) {
       builder.setPriority(call.priority);
     }
+    if (call.attributes != null && !call.attributes.isEmpty()) {
+      HBaseProtos.NameBytesPair.Builder attributeBuilder = 
HBaseProtos.NameBytesPair.newBuilder();
+      for (Map.Entry<String, byte[]> attribute : call.attributes.entrySet()) {
+        attributeBuilder.setName(attribute.getKey());
+        
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
+        builder.addAttribute(attributeBuilder.build());
+      }
+    }
     builder.setTimeout(call.timeout);
 
     return builder.build();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
index 231caa40a89..ed0c4fffc72 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -55,7 +57,12 @@ public class NettyRpcClient extends 
AbstractRpcClient<NettyRpcConnection> {
 
   public NettyRpcClient(Configuration configuration, String clusterId, 
SocketAddress localAddress,
     MetricsConnection metrics) {
-    super(configuration, clusterId, localAddress, metrics);
+    this(configuration, clusterId, localAddress, metrics, 
Collections.emptyMap());
+  }
+
+  public NettyRpcClient(Configuration configuration, String clusterId, 
SocketAddress localAddress,
+    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
+    super(configuration, clusterId, localAddress, metrics, 
connectionAttributes);
     Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass =
       NettyRpcClientConfigHelper.getEventLoopConfig(conf);
     if (groupAndChannelClass == null) {
@@ -75,7 +82,7 @@ public class NettyRpcClient extends 
AbstractRpcClient<NettyRpcConnection> {
 
   /** Used in test only. */
   public NettyRpcClient(Configuration configuration) {
-    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
+    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, 
Collections.emptyMap());
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 48104038c21..3f9a58d5126 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -104,7 +104,7 @@ class NettyRpcConnection extends RpcConnection {
   NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws 
IOException {
     super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, 
rpcClient.clusterId,
       rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, 
rpcClient.compressor,
-      rpcClient.metrics);
+      rpcClient.metrics, rpcClient.connectionAttributes);
     this.rpcClient = rpcClient;
     this.eventLoop = rpcClient.group.next();
     byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
index 9b69b523405..f1df572675c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -59,7 +61,7 @@ public final class RpcClientFactory {
    */
   public static RpcClient createClient(Configuration conf, String clusterId,
     MetricsConnection metrics) {
-    return createClient(conf, clusterId, null, metrics);
+    return createClient(conf, clusterId, null, metrics, 
Collections.emptyMap());
   }
 
   private static String getRpcClientClass(Configuration conf) {
@@ -81,10 +83,11 @@ public final class RpcClientFactory {
    * @return newly created RpcClient
    */
   public static RpcClient createClient(Configuration conf, String clusterId,
-    SocketAddress localAddr, MetricsConnection metrics) {
+    SocketAddress localAddr, MetricsConnection metrics, Map<String, byte[]> 
connectionAttributes) {
     String rpcClientClass = getRpcClientClass(conf);
-    return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new 
Class[] {
-      Configuration.class, String.class, SocketAddress.class, 
MetricsConnection.class },
-      new Object[] { conf, clusterId, localAddr, metrics });
+    return ReflectionUtils.instantiateWithCustomCtor(
+      rpcClientClass, new Class[] { Configuration.class, String.class, 
SocketAddress.class,
+        MetricsConnection.class, Map.class },
+      new Object[] { conf, clusterId, localAddr, metrics, connectionAttributes 
});
   }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
index 912fa4fb065..31698a1a1e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -39,11 +40,13 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
 
@@ -70,6 +73,7 @@ abstract class RpcConnection {
   protected final CompressionCodec compressor;
 
   protected final MetricsConnection metrics;
+  private final Map<String, byte[]> connectionAttributes;
 
   protected final HashedWheelTimer timeoutTimer;
 
@@ -86,12 +90,13 @@ abstract class RpcConnection {
 
   protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, 
ConnectionId remoteId,
     String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec 
compressor,
-    MetricsConnection metrics) throws IOException {
+    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) 
throws IOException {
     this.timeoutTimer = timeoutTimer;
     this.codec = codec;
     this.compressor = compressor;
     this.conf = conf;
     this.metrics = metrics;
+    this.connectionAttributes = connectionAttributes;
     User ticket = remoteId.getTicket();
     this.securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());
     this.useSasl = isSecurityEnabled;
@@ -169,6 +174,14 @@ abstract class RpcConnection {
     if (this.compressor != null) {
       
builder.setCellBlockCompressorClass(this.compressor.getClass().getCanonicalName());
     }
+    if (connectionAttributes != null && !connectionAttributes.isEmpty()) {
+      HBaseProtos.NameBytesPair.Builder attributeBuilder = 
HBaseProtos.NameBytesPair.newBuilder();
+      for (Map.Entry<String, byte[]> attribute : 
connectionAttributes.entrySet()) {
+        attributeBuilder.setName(attribute.getKey());
+        
attributeBuilder.setValue(UnsafeByteOperations.unsafeWrap(attribute.getValue()));
+        builder.addAttribute(attributeBuilder.build());
+      }
+    }
     builder.setVersionInfo(ProtobufUtil.getVersionInfo());
     boolean isCryptoAESEnable = conf.getBoolean(CRYPTO_AES_ENABLED_KEY, 
CRYPTO_AES_ENABLED_DEFAULT);
     // if Crypto AES enable, setup Cipher transformation
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
index 6c97c19f96c..54b351f00a3 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThrows;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -95,7 +96,7 @@ public class TestRpcBasedRegistryHedgedReads {
   public static final class RpcClientImpl implements RpcClient {
 
     public RpcClientImpl(Configuration configuration, String clusterId, 
SocketAddress localAddress,
-      MetricsConnection metrics) {
+      MetricsConnection metrics, Map<String, byte[]> attributes) {
     }
 
     @Override
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java
index 7375388e4a0..10948358ff9 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.Collections;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
@@ -148,7 +149,8 @@ public class TestTLSHandshadeFailure {
       Address.fromParts("127.0.0.1", server.getLocalPort()));
     NettyRpcConnection conn = client.createConnection(id);
     BlockingRpcCallback<Call> done = new BlockingRpcCallback<>();
-    Call call = new Call(1, null, null, null, null, 0, 0, done, new 
CallStats());
+    Call call =
+      new Call(1, null, null, null, null, 0, 0, Collections.emptyMap(), done, 
new CallStats());
     HBaseRpcController hrc = new HBaseRpcControllerImpl();
     conn.sendRequest(call, hrc);
     done.get();
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 51e9e1e7755..fc7f66129d3 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -1667,9 +1667,10 @@ public class TestHFileOutputFormat2 {
 
     private final Connection delegate;
 
-    public ConfigurationCaptorConnection(Configuration conf, ExecutorService 
es, User user)
-      throws IOException {
-      delegate = FutureUtils.get(createAsyncConnection(conf, 
user)).toConnection();
+    public ConfigurationCaptorConnection(Configuration conf, ExecutorService 
es, User user,
+      Map<String, byte[]> connectionAttributes) throws IOException {
+      delegate =
+        FutureUtils.get(createAsyncConnection(conf, user, 
connectionAttributes)).toConnection();
 
       final String uuid = conf.get(UUID_KEY);
       if (uuid != null) {
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
index 7d099aa44e2..0c879bd5ace 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java
@@ -123,7 +123,8 @@ public class TestMultiTableInputFormatBase {
     private final Configuration configuration;
     static final AtomicInteger creations = new AtomicInteger(0);
 
-    MRSplitsConnection(Configuration conf, ExecutorService pool, User user) 
throws IOException {
+    MRSplitsConnection(Configuration conf, ExecutorService pool, User user,
+      Map<String, byte[]> connectionAttributes) throws IOException {
       this.configuration = conf;
       creations.incrementAndGet();
     }
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
index 13e3831f6df..f41282b8f4f 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
@@ -212,8 +212,8 @@ public class TestTableInputFormatBase {
       SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L);
     }
 
-    ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User 
user)
-      throws IOException {
+    ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User 
user,
+      Map<String, byte[]> connectionAttributes) throws IOException {
     }
 
     @Override
diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto 
b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto
index 6426f0cb06c..e992e681fbf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto
@@ -92,6 +92,7 @@ message ConnectionHeader {
   optional VersionInfo version_info = 5;
   // the transformation for rpc AES encryption with Apache Commons Crypto
   optional string rpc_crypto_cipher_transformation = 6;
+  repeated NameBytesPair attribute = 7;
 }
 
 // This is sent by rpc server to negotiate the data if necessary
@@ -148,6 +149,7 @@ message RequestHeader {
   // See HConstants.
   optional uint32 priority = 6;
   optional uint32 timeout = 7;
+  repeated NameBytesPair attribute = 8;
 }
 
 message ResponseHeader {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 1dda6c32ca0..e2c11ab1d5e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.net.SocketAddress;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -59,7 +60,7 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl 
implements AsyncClu
 
   public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry 
registry,
     String clusterId, SocketAddress localAddress, User user) {
-    super(conf, registry, clusterId, localAddress, user);
+    super(conf, registry, clusterId, localAddress, user, 
Collections.emptyMap());
   }
 
   @Override
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
index 02718145c9b..e2b45fe30c3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -54,7 +55,8 @@ public class AsyncRegionReplicationRetryingCaller extends 
AsyncRpcRetryingCaller
     RegionInfo replica, List<Entry> entries) {
     super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
       conn.connConf.getPauseNs(), 
conn.connConf.getPauseNsForServerOverloaded(), maxAttempts,
-      operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+      operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt(),
+      Collections.emptyMap());
     this.replica = replica;
     this.entries = entries.toArray(new Entry[0]);
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
index 197ddb71d7e..cc97a39c7ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
@@ -27,6 +27,7 @@ import 
org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
 import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
 
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
 
 /**
@@ -82,6 +83,8 @@ public interface RpcCall extends RpcCallContext {
   /** Returns The request header of this call. */
   RequestHeader getHeader();
 
+  ConnectionHeader getConnectionHeader();
+
   /** Returns Port of remote address in this call */
   int getRemotePort();
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 2188795914d..f3568a36f14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -49,6 +49,7 @@ import 
org.apache.hbase.thirdparty.com.google.protobuf.Message;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
@@ -207,6 +208,11 @@ public abstract class ServerCall<T extends 
ServerRpcConnection> implements RpcCa
     return this.header;
   }
 
+  @Override
+  public RPCProtos.ConnectionHeader getConnectionHeader() {
+    return this.connection.connectionHeader;
+  }
+
   @Override
   public int getPriority() {
     return this.header.getPriority();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
index a87babad0d2..45e59def721 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -77,6 +78,11 @@ public class DummyAsyncTable<C extends 
ScanResultConsumerBase> implements AsyncT
     return 0;
   }
 
+  @Override
+  public Map<String, byte[]> getRequestAttributes() {
+    return null;
+  }
+
   @Override
   public CompletableFuture<Result> get(Get get) {
     return null;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 65def75fff1..d358695c5f9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -129,8 +130,8 @@ public class TestClientTimeouts {
    */
   public static class RandomTimeoutRpcClient extends BlockingRpcClient {
     public RandomTimeoutRpcClient(Configuration conf, String clusterId, 
SocketAddress localAddr,
-      MetricsConnection metrics) {
-      super(conf, clusterId, localAddr, metrics);
+      MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
+      super(conf, clusterId, localAddr, metrics, connectionAttributes);
     }
 
     // Return my own instance, one that does random timeouts
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java
new file mode 100644
index 00000000000..b376bfc1855
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAndConnectionAttributes.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestRequestAndConnectionAttributes {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRequestAndConnectionAttributes.class);
+
+  private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new 
HashMap<>();
+  static {
+    CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
+  }
+  private static final Map<String, byte[]> REQUEST_ATTRIBUTES = new 
HashMap<>();
+  private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(100);
+  private static final AtomicBoolean REQUEST_ATTRIBUTES_VALIDATED = new 
AtomicBoolean(false);
+  private static final byte[] REQUEST_ATTRIBUTES_TEST_TABLE_CF = 
Bytes.toBytes("0");
+  private static final TableName REQUEST_ATTRIBUTES_TEST_TABLE =
+    TableName.valueOf("testRequestAttributes");
+
+  private static HBaseTestingUtil TEST_UTIL = null;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL = new HBaseTestingUtil();
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE,
+      new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, 
HConstants.DEFAULT_BLOCKSIZE,
+      AttributesCoprocessor.class.getName());
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setup() {
+    REQUEST_ATTRIBUTES_VALIDATED.getAndSet(false);
+  }
+
+  @Test
+  public void testConnectionAttributes() throws IOException {
+    TableName tableName = TableName.valueOf("testConnectionAttributes");
+    TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1,
+      HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (Connection conn = ConnectionFactory.createConnection(conf, null,
+      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = 
conn.getTable(tableName)) {
+      Result result = table.get(new Get(Bytes.toBytes(0)));
+      assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
+      for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
+        byte[] val = result.getValue(Bytes.toBytes("c"), 
Bytes.toBytes(attr.getKey()));
+        assertEquals(Bytes.toStringBinary(attr.getValue()), 
Bytes.toStringBinary(val));
+      }
+    }
+  }
+
+  @Test
+  public void testRequestAttributesGet() throws IOException {
+    addRandomRequestAttributes();
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf, null, 
AuthUtil.loginClient(conf),
+        CONNECTION_ATTRIBUTES);
+      Table table = configureRequestAttributes(
+        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, 
EXECUTOR_SERVICE)).build()) {
+
+      table.get(new Get(Bytes.toBytes(0)));
+    }
+
+    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+  }
+
+  @Test
+  public void testRequestAttributesMultiGet() throws IOException {
+    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
+    addRandomRequestAttributes();
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf, null, 
AuthUtil.loginClient(conf),
+        CONNECTION_ATTRIBUTES);
+      Table table = configureRequestAttributes(
+        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, 
EXECUTOR_SERVICE)).build()) {
+      List<Get> gets = ImmutableList.of(new Get(Bytes.toBytes(0)), new 
Get(Bytes.toBytes(1)));
+      table.get(gets);
+    }
+
+    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+  }
+
+  @Test
+  public void testRequestAttributesExists() throws IOException {
+    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
+    addRandomRequestAttributes();
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf, null, 
AuthUtil.loginClient(conf),
+        CONNECTION_ATTRIBUTES);
+      Table table = configureRequestAttributes(
+        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, 
EXECUTOR_SERVICE)).build()) {
+
+      table.exists(new Get(Bytes.toBytes(0)));
+    }
+
+    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+  }
+
+  @Test
+  public void testRequestAttributesScan() throws IOException {
+    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
+    addRandomRequestAttributes();
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf, null, 
AuthUtil.loginClient(conf),
+        CONNECTION_ATTRIBUTES);
+      Table table = configureRequestAttributes(
+        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, 
EXECUTOR_SERVICE)).build()) {
+      ResultScanner scanner = table.getScanner(new Scan());
+      scanner.next();
+    }
+    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+  }
+
+  @Test
+  public void testRequestAttributesPut() throws IOException {
+    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
+    addRandomRequestAttributes();
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf, null, 
AuthUtil.loginClient(conf),
+        CONNECTION_ATTRIBUTES);
+      Table table = configureRequestAttributes(
+        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, 
EXECUTOR_SERVICE)).build()) {
+      Put put = new Put(Bytes.toBytes("a"));
+      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), 
Bytes.toBytes("v"));
+      table.put(put);
+    }
+    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+  }
+
+  @Test
+  public void testRequestAttributesMultiPut() throws IOException {
+    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
+    addRandomRequestAttributes();
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (
+      Connection conn = ConnectionFactory.createConnection(conf, null, 
AuthUtil.loginClient(conf),
+        CONNECTION_ATTRIBUTES);
+      Table table = configureRequestAttributes(
+        conn.getTableBuilder(REQUEST_ATTRIBUTES_TEST_TABLE, 
EXECUTOR_SERVICE)).build()) {
+      Put put = new Put(Bytes.toBytes("a"));
+      put.addColumn(REQUEST_ATTRIBUTES_TEST_TABLE_CF, Bytes.toBytes("c"), 
Bytes.toBytes("v"));
+      table.put(put);
+    }
+    assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+  }
+
+  @Test
+  public void testNoRequestAttributes() throws IOException {
+    assertFalse(REQUEST_ATTRIBUTES_VALIDATED.get());
+    TableName tableName = TableName.valueOf("testNoRequestAttributesScan");
+    TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1,
+      HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
+
+    REQUEST_ATTRIBUTES.clear();
+    Configuration conf = TEST_UTIL.getConfiguration();
+    try (Connection conn = ConnectionFactory.createConnection(conf, null,
+      AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES)) {
+      TableBuilder tableBuilder = conn.getTableBuilder(tableName, null);
+      try (Table table = tableBuilder.build()) {
+        table.get(new Get(Bytes.toBytes(0)));
+        assertTrue(REQUEST_ATTRIBUTES_VALIDATED.get());
+      }
+    }
+  }
+
+  private void addRandomRequestAttributes() {
+    REQUEST_ATTRIBUTES.clear();
+    int j = Math.max(2, (int) (10 * Math.random()));
+    for (int i = 0; i < j; i++) {
+      REQUEST_ATTRIBUTES.put(String.valueOf(i), 
Bytes.toBytes(UUID.randomUUID().toString()));
+    }
+  }
+
+  private static TableBuilder configureRequestAttributes(TableBuilder 
tableBuilder) {
+    REQUEST_ATTRIBUTES.forEach(tableBuilder::setRequestAttribute);
+    return tableBuilder;
+  }
+
+  public static class AttributesCoprocessor implements RegionObserver, 
RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get 
get,
+      List<Cell> result) throws IOException {
+      validateRequestAttributes();
+
+      // for connection attrs test
+      RpcCall rpcCall = RpcServer.getCurrentCall().get();
+      for (HBaseProtos.NameBytesPair attr : 
rpcCall.getHeader().getAttributeList()) {
+        
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
+          
.setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName()))
+          
.setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build());
+      }
+      for (HBaseProtos.NameBytesPair attr : 
rpcCall.getConnectionHeader().getAttributeList()) {
+        
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
+          
.setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName()))
+          
.setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build());
+      }
+      result.sort(CellComparator.getInstance());
+      c.bypass();
+    }
+
+    @Override
+    public boolean 
preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
+      InternalScanner s, List<Result> result, int limit, boolean hasNext) 
throws IOException {
+      validateRequestAttributes();
+      return hasNext;
+    }
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put 
put, WALEdit edit)
+      throws IOException {
+      validateRequestAttributes();
+    }
+
+    private void validateRequestAttributes() {
+      RpcCall rpcCall = RpcServer.getCurrentCall().get();
+      List<HBaseProtos.NameBytesPair> attrs = 
rpcCall.getHeader().getAttributeList();
+      if (attrs.size() != REQUEST_ATTRIBUTES.size()) {
+        return;
+      }
+      for (HBaseProtos.NameBytesPair attr : attrs) {
+        if (!REQUEST_ATTRIBUTES.containsKey(attr.getName())) {
+          return;
+        }
+        if (!Arrays.equals(REQUEST_ATTRIBUTES.get(attr.getName()), 
attr.getValue().toByteArray())) {
+          return;
+        }
+      }
+      REQUEST_ATTRIBUTES_VALIDATED.getAndSet(true);
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
index f36fef186f0..feaf44e0b84 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
@@ -71,8 +72,8 @@ public class TestRpcClientLeaks {
     }
 
     public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress 
address,
-      MetricsConnection metrics) {
-      super(conf, clusterId, address, metrics);
+      MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
+      super(conf, clusterId, address, metrics, connectionAttributes);
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
index e14b710647d..80b3845d668 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServerSlowConnectionSetup.java
@@ -124,7 +124,7 @@ public class TestRpcServerSlowConnectionSetup {
     int callId = 10;
     Call call = new Call(callId, 
TestProtobufRpcProto.getDescriptor().findMethodByName("ping"),
       EmptyRequestProto.getDefaultInstance(), null, 
EmptyResponseProto.getDefaultInstance(), 1000,
-      HConstants.NORMAL_QOS, null, MetricsConnection.newCallStats());
+      HConstants.NORMAL_QOS, null, null, MetricsConnection.newCallStats());
     RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, null);
     dos.writeInt(IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, 
call.param));
     requestHeader.writeDelimitedTo(dos);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
index 909e7fdb7f3..7a3ca0b7cf9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
@@ -632,6 +632,7 @@ public class TestNamedQueueRecorder {
     return getRpcCall(userName, Optional.of(forcedParamIndex));
   }
 
+  @SuppressWarnings("checkstyle:methodlength")
   private static RpcCall getRpcCall(String userName, Optional<Integer> 
forcedParamIndex) {
     RpcCall rpcCall = new RpcCall() {
       @Override
@@ -666,7 +667,6 @@ public class TestNamedQueueRecorder {
 
       @Override
       public void setStartTime(long startTime) {
-
       }
 
       @Override
@@ -694,6 +694,11 @@ public class TestNamedQueueRecorder {
         return null;
       }
 
+      @Override
+      public RPCProtos.ConnectionHeader getConnectionHeader() {
+        return null;
+      }
+
       @Override
       public int getRemotePort() {
         return 0;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
index d26870b77df..dd49d00ac3a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -158,6 +158,7 @@ public class TestRegionProcedureStore extends 
RegionProcedureStoreTestBase {
     RpcServer.setCurrentCall(null);
   }
 
+  @SuppressWarnings("checkstyle:methodlength")
   private RpcCall newRpcCallWithDeadline() {
     return new RpcCall() {
       @Override
@@ -220,6 +221,11 @@ public class TestRegionProcedureStore extends 
RegionProcedureStoreTestBase {
         return null;
       }
 
+      @Override
+      public RPCProtos.ConnectionHeader getConnectionHeader() {
+        return null;
+      }
+
       @Override
       public int getRemotePort() {
         return 0;
diff --git 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java
 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java
index 250b8a74f03..db1b1e1c987 100644
--- 
a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java
+++ 
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftConnection.java
@@ -89,8 +89,8 @@ public class ThriftConnection implements Connection {
   private int operationTimeout;
   private int connectTimeout;
 
-  public ThriftConnection(Configuration conf, ExecutorService pool, final User 
user)
-    throws IOException {
+  public ThriftConnection(Configuration conf, ExecutorService pool, final User 
user,
+    Map<String, byte[]> connectionAttributes) throws IOException {
     this.conf = conf;
     this.user = user;
     this.host = conf.get(Constants.HBASE_THRIFT_SERVER_NAME);
@@ -322,6 +322,11 @@ public class ThriftConnection implements Connection {
         return this;
       }
 
+      @Override
+      public TableBuilder setRequestAttribute(String key, byte[] value) {
+        return this;
+      }
+
       @Override
       public Table build() {
         try {


Reply via email to