This is an automated email from the ASF dual-hosted git repository.

zhangmeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 72541ea  Track failures for async read/write in ZkClient (#1663)
72541ea is described below

commit 72541eac1d49294ea1c126e4c359b1611c0dce71
Author: Ramin Bashizade <[email protected]>
AuthorDate: Fri Mar 19 16:11:56 2021 -0700

    Track failures for async read/write in ZkClient (#1663)
    
    * Defines separate metrics for async (batch) operations and tracks failure 
count by checking for operation return code in ZkAsyncCallbacks. Also, cancels 
async get/exists/delete callbacks when an exception is thrown to be able to 
detect failure and record the metric. Finally, in TestRawZkClient sync read 
metrics were used to check async reads, which is fixed to reflect the 
separation of sync and async metrics.
---
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 102 ++++++----
 .../callback/ZkAsyncCallMonitorContext.java        |  18 +-
 .../zkclient/callback/ZkAsyncCallbacks.java        |  25 ++-
 .../zookeeper/zkclient/metric/ZkClientMonitor.java |  26 +++
 .../zkclient/metric/ZkClientPathMonitor.java       |  58 ++++++
 .../zookeeper/impl/client/TestRawZkClient.java     |  16 +-
 .../impl/client/TestZkClientAsyncRetry.java        | 210 +++++++++++++++++++--
 7 files changed, 396 insertions(+), 59 deletions(-)

diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 7cef694..268910b 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1266,13 +1266,20 @@ public class ZkClient implements Watcher {
 
   private void doAsyncSync(final ZooKeeper zk, final String path, final long 
startT,
       final ZkAsyncCallbacks.SyncCallbackHandler cb) {
-    zk.sync(path, cb,
-        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
-          @Override
-          protected void doRetry() throws Exception {
-            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
-          }
-        });
+    try {
+      zk.sync(path, cb,
+          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
+            @Override
+            protected void doRetry() throws Exception {
+              doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+            }
+          });
+    } catch (RuntimeException e) {
+      // Process callback to release caller from waiting
+      cb.processResult(ZkAsyncCallbacks.UNKNOWN_RET_CODE, path,
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, true));
+      throw e;
+    }
   }
 
 
@@ -2001,44 +2008,65 @@ public class ZkClient implements Watcher {
 
   public void asyncGetData(final String path, final 
ZkAsyncCallbacks.GetDataCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
-            @Override
-            protected void doRetry() {
-              asyncGetData(path, cb);
-            }
-          });
-      return null;
-    });
+    try {
+      retryUntilConnected(() -> {
+        ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
+              @Override
+              protected void doRetry() {
+                asyncGetData(path, cb);
+              }
+            });
+        return null;
+      });
+    } catch (RuntimeException e) {
+      // Process callback to release caller from waiting
+      cb.processResult(ZkAsyncCallbacks.UNKNOWN_RET_CODE, path,
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, true), null, 
null);
+      throw e;
+    }
   }
 
   public void asyncExists(final String path, final 
ZkAsyncCallbacks.ExistsCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
-            @Override
-            protected void doRetry() {
-              asyncExists(path, cb);
-            }
-          });
-      return null;
-    });
+    try {
+      retryUntilConnected(() -> {
+        ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, true) {
+              @Override
+              protected void doRetry() {
+                asyncExists(path, cb);
+              }
+            });
+        return null;
+      });
+    } catch (RuntimeException e) {
+      // Process callback to release caller from waiting
+      cb.processResult(ZkAsyncCallbacks.UNKNOWN_RET_CODE, path,
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, true), null);
+      throw e;
+    }
   }
 
   public void asyncDelete(final String path, final 
ZkAsyncCallbacks.DeleteCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
-          new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, false) {
-            @Override
-            protected void doRetry() {
-              asyncDelete(path, cb);
-            }
-          });
-      return null;
-    });
+    try {
+      retryUntilConnected(() -> {
+        ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, 
startT, 0, false) {
+              @Override
+              protected void doRetry() {
+                asyncDelete(path, cb);
+              }
+            });
+        return null;
+      });
+    } catch (RuntimeException e) {
+      // Process callback to release caller from waiting
+      cb.processResult(ZkAsyncCallbacks.UNKNOWN_RET_CODE, path,
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false));
+      throw e;
+    }
   }
 
   private void checkDataSizeLimit(String path, byte[] data) {
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
index e73513c..ce691e9 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
@@ -56,9 +56,23 @@ public class ZkAsyncCallMonitorContext {
   void recordAccess(String path) {
     if (_monitor != null) {
       if (_isRead) {
-        _monitor.record(path, _bytes, _startTimeMilliSec, 
ZkClientMonitor.AccessType.READ);
+        _monitor.recordAsync(path, _bytes, _startTimeMilliSec, 
ZkClientMonitor.AccessType.READ);
       } else {
-        _monitor.record(path, _bytes, _startTimeMilliSec, 
ZkClientMonitor.AccessType.WRITE);
+        _monitor.recordAsync(path, _bytes, _startTimeMilliSec, 
ZkClientMonitor.AccessType.WRITE);
+      }
+    }
+  }
+
+  /**
+   * Record the operation failure into the specified ZkClient monitor.
+   * @param path The monitored path
+   */
+  void recordFailure(String path) {
+    if (_monitor != null) {
+      if (_isRead) {
+        _monitor.recordAsyncFailure(path, ZkClientMonitor.AccessType.READ);
+      } else {
+        _monitor.recordAsyncFailure(path, ZkClientMonitor.AccessType.WRITE);
       }
     }
   }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index db52e14..fc1d14d 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -58,6 +58,13 @@ public class ZkAsyncCallbacks {
     public void handle() {
       // TODO Auto-generated method stub
     }
+
+    @Override
+    protected void recordFailure(int rc, String path, 
ZkAsyncCallMonitorContext monitor) {
+      if(rc != Code.NONODE.intValue()) {
+        monitor.recordFailure(path);
+      }
+    }
   }
 
   public static class SetDataCallbackHandler extends DefaultCallback 
implements StatCallback {
@@ -96,6 +103,13 @@ public class ZkAsyncCallbacks {
     public void handle() {
       // TODO Auto-generated method stub
     }
+
+    @Override
+    protected void recordFailure(int rc, String path, 
ZkAsyncCallMonitorContext monitor) {
+      if(rc != Code.NONODE.intValue()) {
+        monitor.recordFailure(path);
+      }
+    }
   }
 
   public static class CreateCallbackHandler extends DefaultCallback implements 
StringCallback {
@@ -170,7 +184,12 @@ public class ZkAsyncCallbacks {
       }
 
       if (ctx != null && ctx instanceof ZkAsyncCallMonitorContext) {
-        ((ZkAsyncCallMonitorContext) ctx).recordAccess(path);
+        ZkAsyncCallMonitorContext monitor = (ZkAsyncCallMonitorContext) ctx;
+        if(rc == 0) {
+          monitor.recordAccess(path);
+        } else {
+          recordFailure(rc, path, monitor);
+        }
       }
 
       _rc = rc;
@@ -206,6 +225,10 @@ public class ZkAsyncCallbacks {
       }
     }
 
+    protected void recordFailure(int rc, String path, 
ZkAsyncCallMonitorContext monitor) {
+      monitor.recordFailure(path);
+    }
+
     public boolean isOperationDone() {
       return _isOperationDone.get();
     }
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index 6bfc747..4ce298f 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -245,6 +245,32 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     }
   }
 
+  /**
+   * Records metrics for async operations
+   */
+  private void recordAsync(String path, int bytes, long latencyMilliSec, 
boolean isFailure,
+      AccessType accessType) {
+    if (null == path) {
+      return;
+    }
+    Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+        .filter(predefinedPath -> predefinedPath.match(path))
+        .forEach(predefinedPath -> {
+          ZkClientPathMonitor zkClientPathMonitor = 
_zkClientPathMonitorMap.get(predefinedPath);
+          if (zkClientPathMonitor != null) {
+            zkClientPathMonitor.recordAsync(bytes, latencyMilliSec, isFailure, 
accessType);
+          }
+        });
+  }
+
+  public void recordAsync(String path, int dataSize, long startTimeMilliSec, 
AccessType accessType) {
+    recordAsync(path, dataSize, System.currentTimeMillis() - 
startTimeMilliSec, false, accessType);
+  }
+
+  public void recordAsyncFailure(String path, AccessType accessType) {
+    recordAsync(path, 0, 0, true, accessType);
+  }
+
   class ZkThreadMetric extends DynamicMetric<ZkEventThread, ZkEventThread> {
     public ZkThreadMetric(ZkEventThread eventThread) {
       super("ZkEventThead", eventThread);
diff --git 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
index 1ab2653..59643f1 100644
--- 
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
+++ 
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientPathMonitor.java
@@ -69,10 +69,14 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     ReadTotalLatencyCounter,
     WriteFailureCounter,
     ReadFailureCounter,
+    WriteAsyncFailureCounter,
+    ReadAsyncFailureCounter,
     WriteBytesCounter,
     ReadBytesCounter,
     WriteCounter,
     ReadCounter,
+    WriteAsyncCounter,
+    ReadAsyncCounter,
     ReadLatencyGauge,
     WriteLatencyGauge,
     ReadBytesGauge,
@@ -91,10 +95,14 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
 
   private SimpleDynamicMetric<Long> _readCounter;
   private SimpleDynamicMetric<Long> _writeCounter;
+  private SimpleDynamicMetric<Long> _readAsyncCounter;
+  private SimpleDynamicMetric<Long> _writeAsyncCounter;
   private SimpleDynamicMetric<Long> _readBytesCounter;
   private SimpleDynamicMetric<Long> _writeBytesCounter;
   private SimpleDynamicMetric<Long> _readFailureCounter;
   private SimpleDynamicMetric<Long> _writeFailureCounter;
+  private SimpleDynamicMetric<Long> _readAsyncFailureCounter;
+  private SimpleDynamicMetric<Long> _writeAsyncFailureCounter;
   private SimpleDynamicMetric<Long> _readTotalLatencyCounter;
   private SimpleDynamicMetric<Long> _writeTotalLatencyCounter;
 
@@ -134,12 +142,20 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
         new 
SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
     _readFailureCounter =
         new 
SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
+    _writeAsyncFailureCounter =
+        new 
SimpleDynamicMetric(PredefinedMetricDomains.WriteAsyncFailureCounter.name(), 
0l);
+    _readAsyncFailureCounter =
+        new 
SimpleDynamicMetric(PredefinedMetricDomains.ReadAsyncFailureCounter.name(), 0l);
     _writeBytesCounter =
         new 
SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
     _readBytesCounter =
         new 
SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
     _writeCounter = new 
SimpleDynamicMetric(PredefinedMetricDomains.WriteCounter.name(), 0l);
     _readCounter = new 
SimpleDynamicMetric(PredefinedMetricDomains.ReadCounter.name(), 0l);
+    _writeAsyncCounter =
+        new 
SimpleDynamicMetric(PredefinedMetricDomains.WriteAsyncCounter.name(), 0l);
+    _readAsyncCounter =
+        new 
SimpleDynamicMetric(PredefinedMetricDomains.ReadAsyncCounter.name(), 0l);
 
     _readLatencyGauge = new 
HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(),
         new Histogram(
@@ -169,10 +185,14 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
     attributeList.add(_readCounter);
     attributeList.add(_writeCounter);
+    attributeList.add(_readAsyncCounter);
+    attributeList.add(_writeAsyncCounter);
     attributeList.add(_readBytesCounter);
     attributeList.add(_writeBytesCounter);
     attributeList.add(_readFailureCounter);
     attributeList.add(_writeFailureCounter);
+    attributeList.add(_readAsyncFailureCounter);
+    attributeList.add(_writeAsyncFailureCounter);
     attributeList.add(_readTotalLatencyCounter);
     attributeList.add(_writeTotalLatencyCounter);
     attributeList.add(_readLatencyGauge);
@@ -204,6 +224,18 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     }
   }
 
+  /**
+   * Records metrics for async operations
+   */
+  protected synchronized void recordAsync(int bytes, long latencyMilliSec, 
boolean isFailure,
+      ZkClientMonitor.AccessType accessType) {
+    if (isFailure) {
+      increaseAsyncFailureCounter(accessType);
+    } else {
+      increaseAsyncCounter(accessType);
+    }
+  }
+
   public void recordDataPropagationLatency(long latency) {
     _dataPropagationLatencyGauge.updateValue(latency);
     _dataPropagationLatencyGuage.updateValue(latency);
@@ -217,6 +249,19 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     }
   }
 
+  private void increaseAsyncFailureCounter(ZkClientMonitor.AccessType 
accessType) {
+    switch (accessType) {
+      case READ:
+        
_readAsyncFailureCounter.updateValue(_readAsyncFailureCounter.getValue() + 1);
+        return;
+      case WRITE:
+        
_writeAsyncFailureCounter.updateValue(_writeAsyncFailureCounter.getValue() + 1);
+        return;
+      default:
+        return;
+    }
+  }
+
   private void increaseCounter(boolean isRead) {
     if (isRead) {
       _readCounter.updateValue(_readCounter.getValue() + 1);
@@ -225,6 +270,19 @@ public class ZkClientPathMonitor extends 
DynamicMBeanProvider {
     }
   }
 
+  private void increaseAsyncCounter(ZkClientMonitor.AccessType accessType) {
+    switch (accessType) {
+      case READ:
+      _readAsyncCounter.updateValue(_readAsyncCounter.getValue() + 1);
+        return;
+      case WRITE:
+      _writeAsyncCounter.updateValue(_writeAsyncCounter.getValue() + 1);
+        return;
+      default:
+        return;
+    }
+  }
+
   private void increaseBytesCounter(boolean isRead, int bytes) {
     if (isRead) {
       _readBytesCounter.updateValue(_readBytesCounter.getValue() + bytes);
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index c967553..22be865 100644
--- 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -289,7 +289,7 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertEquals((long) beanServer.getAttribute(name, 
"OutstandingRequestGauge"), 0);
 
     boolean verifyResult = TestHelper.verify(()->{
-      return (long) beanServer.getAttribute(rootname, "ReadCounter") == 1;
+      return (long) beanServer.getAttribute(rootname, "ReadAsyncCounter") == 1;
     }, TestHelper.WAIT_DURATION);
     Assert.assertTrue(verifyResult, " did not see first sync() read");
 
@@ -301,7 +301,7 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertTrue(firstReadLatencyGauge >= 0);
     zkClient.exists(TEST_ROOT);
 
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadCounter") 
== 2);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadCounter") 
== 1);
 
     Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadTotalLatencyCounter") >= firstLatencyCounter);
     Assert.assertTrue((long) beanServer.getAttribute(rootname, 
"ReadLatencyGauge.Max") >= firstReadLatencyGauge);
@@ -333,7 +333,7 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"WriteLatencyGauge.Max") >= 0);
 
     // Test read
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"), 0);
@@ -344,7 +344,7 @@ public class TestRawZkClient extends ZkTestBase {
     Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max"), 0);
     zkClient.readData(TEST_PATH, new Stat());
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 3);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 2);
     Assert
         .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 1);
@@ -356,27 +356,27 @@ public class TestRawZkClient extends ZkTestBase {
         >= origIdealStatesReadTotalLatencyCounter);
     Assert.assertTrue((long) beanServer.getAttribute(idealStatename, 
"ReadLatencyGauge.Max") >= 0);
     zkClient.getChildren(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 4);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 3);
     Assert
         .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 2);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
         TEST_DATA_SIZE);
     zkClient.getStat(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 5);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 4);
     Assert
         .assertEquals((long) beanServer.getAttribute(rootname, 
"ReadBytesCounter"), TEST_DATA_SIZE);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadCounter"), 3);
     Assert.assertEquals((long) beanServer.getAttribute(idealStatename, 
"ReadBytesCounter"),
         TEST_DATA_SIZE);
     zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 6);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 5);
 
     ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
         new ZkAsyncCallbacks.ExistsCallbackHandler();
     zkClient.asyncExists(TEST_PATH, callbackHandler);
     callbackHandler.waitForSuccess();
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadCounter"), 7);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, 
"ReadAsyncCounter"), 2);
 
     // Test write
     zkClient.writeData(TEST_PATH, TEST_DATA);
diff --git 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
index b45efcd..6845aa9 100644
--- 
a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
+++ 
b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
@@ -19,13 +19,22 @@ package org.apache.helix.zookeeper.impl.client;
  * under the License.
  */
 
+import java.lang.management.ManagementFactory;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientPathMonitor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
@@ -47,22 +56,55 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
   // the test environment is slow. Extra wait time won't impact the test logic.
   private final long RETRY_OPS_WAIT_TIMEOUT_MS = 3 * 
MockAsyncZkClient.RETRY_INTERVAL_MS;
 
+  final String TEST_TAG = "test_tag";
+  final String TEST_KEY = "test_key";
+  final String TEST_INSTANCE = "test_instance";
+
   private org.apache.helix.zookeeper.zkclient.ZkClient _zkClient;
   private String _zkServerAddress;
 
+  private final MBeanServer _beanServer = 
ManagementFactory.getPlatformMBeanServer();
+  private ZkClientMonitor _monitor;
+  ObjectName _rootName;
+  int _readFailures;
+  int _writeFailures;
+
   @BeforeClass
-  public void beforeClass() {
+  public void beforeClass() throws JMException {
     _zkClient = _zkServerMap.values().iterator().next().getZkClient();
     _zkServerAddress = _zkClient.getServers();
     _zkClient.createPersistent(TEST_ROOT);
+
+    _monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, TEST_INSTANCE, false, 
null);
+    _monitor.register();
+
+    _rootName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE,
+        ZkClientPathMonitor.PredefinedPath.Root.name());
+    _readFailures = 0;
+    _writeFailures = 0;
   }
 
   @AfterClass
   public void afterClass() {
+    _monitor.unregister();
     _zkClient.deleteRecursively(TEST_ROOT);
     _zkClient.close();
   }
 
+  private boolean needRetry(int rc) {
+    switch (KeeperException.Code.get(rc)) {
+      /** Connection to the server has been lost */
+      case CONNECTIONLOSS:
+        /** The session has been expired by the server */
+      case SESSIONEXPIRED:
+        /** Session moved to another server, so operation is ignored */
+      case SESSIONMOVED:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   private boolean waitAsyncOperation(ZkAsyncCallbacks.DefaultCallback 
callback, long timeout) {
     final boolean[] ret = { false };
     Thread waitThread = new Thread(() -> ret[0] = callback.waitForSuccess());
@@ -76,8 +118,19 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
     }
   }
 
+  private ObjectName buildObjectName(String tag, String key, String instance)
+      throws MalformedObjectNameException {
+    return ZkClientMonitor.getObjectName(tag, key, instance);
+  }
+
+  private ObjectName buildPathMonitorObjectName(String tag, String key, String 
instance,
+      String path) throws MalformedObjectNameException {
+    return new ObjectName(String.format("%s,%s=%s", buildObjectName(tag, key, 
instance).toString(),
+        ZkClientPathMonitor.MONITOR_PATH, path));
+  }
+
   @Test
-  public void testAsyncRetryCategories() {
+  public void testAsyncRetryCategories() throws JMException {
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {
       ZNRecord tmpRecord = new ZNRecord("tmpRecord");
@@ -111,7 +164,11 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
           Assert.assertTrue(waitAsyncOperation(createCallback, 
RETRY_OPS_WAIT_TIMEOUT_MS));
           Assert.assertEquals(createCallback.getRc(), code.intValue());
           Assert.assertEquals(testZkClient.getAndResetRetryCount(), 0);
+          ++_writeFailures;
         }
+        Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+            
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+            _writeFailures);
         testZkClient.delete(NODE_PATH);
         Assert.assertFalse(testZkClient.exists(NODE_PATH));
       }
@@ -123,7 +180,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
   }
 
   @Test(dependsOnMethods = "testAsyncRetryCategories")
-  public void testAsyncWriteRetry() {
+  public void testAsyncWriteRetry() throws JMException {
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {
       ZNRecord tmpRecord = new ZNRecord("tmpRecord");
@@ -149,6 +206,10 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       Assert.assertEquals(((ZNRecord) 
testZkClient.readData(NODE_PATH)).getSimpleField("test"),
           "data");
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // Check failure metric, which should be unchanged because the operation 
succeeded
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+          _writeFailures);
 
       // 2. Test async delete
       ZkAsyncCallbacks.DeleteCallbackHandler deleteCallback =
@@ -167,6 +228,10 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       Assert.assertEquals(deleteCallback.getRc(), 
KeeperException.Code.OK.intValue());
       Assert.assertFalse(testZkClient.exists(NODE_PATH));
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // Check failure metric, which should be unchanged because the operation 
succeeded
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+          _writeFailures);
     } finally {
       testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
       testZkClient.close();
@@ -179,7 +244,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
    * the context should be cancelled correctly.
    */
   @Test(dependsOnMethods = "testAsyncWriteRetry")
-  public void testAsyncWriteRetryThrowException() {
+  public void testAsyncWriteRetryThrowException() throws JMException {
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {
       ZNRecord tmpRecord = new ZNRecord("tmpRecord");
@@ -204,6 +269,10 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
           "Async callback should have been canceled");
       Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // Check failure metric, which should be unchanged because the operation 
succeeded
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+          _writeFailures);
 
       // Restore the state
       testZkClient.setZkExceptionInRetry(false);
@@ -226,6 +295,10 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
           "Async callback should have been canceled");
       Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // Check failure metric, which should be unchanged because the operation 
succeeded
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+          _writeFailures);
     } finally {
       testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
       testZkClient.close();
@@ -234,7 +307,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
   }
 
   @Test(dependsOnMethods = "testAsyncWriteRetryThrowException")
-  public void testAsyncReadRetry() {
+  public void testAsyncReadRetry() throws JMException {
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {
       ZNRecord tmpRecord = new ZNRecord("tmpRecord");
@@ -258,6 +331,10 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       Assert.assertEquals(existsCallback.getRc(), 
KeeperException.Code.OK.intValue());
       Assert.assertTrue(existsCallback._stat != null);
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // Check failure metric, which should be unchanged because the operation 
succeeded
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+          _readFailures);
 
       // 2. Test async get
       ZkAsyncCallbacks.GetDataCallbackHandler getCallback =
@@ -277,6 +354,10 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       ZNRecord record = testZkClient.deserialize(getCallback._data, NODE_PATH);
       Assert.assertEquals(record.getSimpleField("foo"), "bar");
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // Check failure metric, which should be unchanged because the operation 
succeeded
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+          _readFailures);
     } finally {
       testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
       testZkClient.close();
@@ -285,7 +366,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
   }
 
   @Test(dependsOnMethods = "testAsyncReadRetry")
-  public void testAsyncRequestCleanup() {
+  public void testAsyncRequestCleanup() throws JMException {
     int cbCount = 10;
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {
@@ -316,6 +397,11 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       for (ZkAsyncCallbacks.ExistsCallbackHandler cb : existsCallbacks) {
         Assert.assertTrue(waitAsyncOperation(cb, RETRY_OPS_WAIT_TIMEOUT_MS));
         Assert.assertEquals(cb.getRc(), CONNECTIONLOSS.intValue());
+        // The failure metric doesn't increase here, because an exception is 
thrown before the logic
+        // responsible for increasing the metric is reached.
+        Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+            
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+            _readFailures);
       }
       Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
     } finally {
@@ -325,6 +411,93 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
     }
   }
 
+  @Test(dependsOnMethods = "testAsyncRequestCleanup")
+  public void testAsyncFailureMetrics() throws JMException {
+    // The remaining failure paths that weren't covered in other test methods 
are tested here
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+      // Test asyncGet failure
+      ZkAsyncCallbacks.GetDataCallbackHandler getCallback =
+          new ZkAsyncCallbacks.GetDataCallbackHandler();
+      Assert.assertEquals(getCallback.getRc(), UNKNOWN_RET_CODE);
+      // asyncGet should fail because the return code is APIERROR
+      testZkClient.setAsyncCallRC(KeeperException.Code.APIERROR.intValue());
+      testZkClient.asyncGetData(NODE_PATH, getCallback);
+      getCallback.waitForSuccess();
+      Assert.assertEquals(getCallback.getRc(), 
KeeperException.Code.APIERROR.intValue());
+      ++_readFailures;
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+          _readFailures);
+      // asyncGet should succeed because the return code is NONODE
+      testZkClient.setAsyncCallRC(KeeperException.Code.NONODE.intValue());
+      testZkClient.asyncGetData(NODE_PATH, getCallback);
+      getCallback.waitForSuccess();
+      Assert.assertEquals(getCallback.getRc(), 
KeeperException.Code.NONODE.intValue());
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+          _readFailures);
+
+      // Test asyncExists failure
+      ZkAsyncCallbacks.ExistsCallbackHandler existsCallback =
+          new ZkAsyncCallbacks.ExistsCallbackHandler();
+      Assert.assertEquals(existsCallback.getRc(), UNKNOWN_RET_CODE);
+      // asyncExists should fail because the return code is APIERROR
+      testZkClient.setAsyncCallRC(KeeperException.Code.APIERROR.intValue());
+      testZkClient.asyncExists(NODE_PATH, existsCallback);
+      existsCallback.waitForSuccess();
+      Assert.assertEquals(existsCallback.getRc(), 
KeeperException.Code.APIERROR.intValue());
+      ++_readFailures;
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+          _readFailures);
+      // asyncExists should fail because the return code is NONODE
+      testZkClient.setAsyncCallRC(KeeperException.Code.NONODE.intValue());
+      testZkClient.asyncExists(NODE_PATH, existsCallback);
+      existsCallback.waitForSuccess();
+      Assert.assertEquals(existsCallback.getRc(), 
KeeperException.Code.NONODE.intValue());
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.ReadAsyncFailureCounter.toString()),
+          _readFailures);
+
+      // Test asyncSet failure
+      ZkAsyncCallbacks.SetDataCallbackHandler setCallback =
+          new ZkAsyncCallbacks.SetDataCallbackHandler();
+      Assert.assertEquals(setCallback.getRc(), UNKNOWN_RET_CODE);
+      // asyncSet should fail because the return code is APIERROR
+      testZkClient.setAsyncCallRC(KeeperException.Code.APIERROR.intValue());
+      testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
+      setCallback.waitForSuccess();
+      Assert.assertEquals(setCallback.getRc(), 
KeeperException.Code.APIERROR.intValue());
+      ++_writeFailures;
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+          _writeFailures);
+
+      // Test asyncDelete failure
+      ZkAsyncCallbacks.DeleteCallbackHandler deleteCallback =
+          new ZkAsyncCallbacks.DeleteCallbackHandler();
+      Assert.assertEquals(deleteCallback.getRc(), UNKNOWN_RET_CODE);
+      // asyncDelete should fail because the return code is APIERROR
+      testZkClient.setAsyncCallRC(KeeperException.Code.APIERROR.intValue());
+      testZkClient.asyncDelete(NODE_PATH, deleteCallback);
+      deleteCallback.waitForSuccess();
+      Assert.assertEquals(deleteCallback.getRc(), 
KeeperException.Code.APIERROR.intValue());
+      ++_writeFailures;
+      Assert.assertEquals((long) _beanServer.getAttribute(_rootName,
+          
ZkClientPathMonitor.PredefinedMetricDomains.WriteAsyncFailureCounter.toString()),
+          _writeFailures);
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
   /**
    * Mock client to whitebox test async functionality.
    */
@@ -365,7 +538,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
         super.asyncCreate(path, datat, mode, cb);
         return;
-      } else {
+      } else if (needRetry(_asyncCallRetCode)) {
         cb.processResult(_asyncCallRetCode, path,
             new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, 
false) {
               @Override
@@ -374,6 +547,9 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
                 asyncCreate(path, datat, mode, cb);
               }
             }, null);
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncCallMonitorContext(_monitor, 0, 0, false), null);
       }
     }
 
@@ -383,7 +559,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
         super.asyncSetData(path, datat, version, cb);
         return;
-      } else {
+      } else if (needRetry(_asyncCallRetCode)) {
         cb.processResult(_asyncCallRetCode, path,
             new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, 
false) {
               @Override
@@ -392,6 +568,9 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
                 asyncSetData(path, datat, version, cb);
               }
             }, null);
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncCallMonitorContext(_monitor, 0, 0, false), null);
       }
     }
 
@@ -400,7 +579,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
         super.asyncGetData(path, cb);
         return;
-      } else {
+      } else if (needRetry(_asyncCallRetCode)) {
         cb.processResult(_asyncCallRetCode, path,
             new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, 
true) {
               @Override
@@ -409,6 +588,9 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
                 asyncGetData(path, cb);
               }
             }, null, null);
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncCallMonitorContext(_monitor, 0, 0, true), null, null);
       }
     }
 
@@ -417,7 +599,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
         super.asyncExists(path, cb);
         return;
-      } else {
+      } else if (needRetry(_asyncCallRetCode)) {
         cb.processResult(_asyncCallRetCode, path,
             new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, 
true) {
               @Override
@@ -426,6 +608,9 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
                 asyncExists(path, cb);
               }
             }, null);
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncCallMonitorContext(_monitor, 0, 0, true), null);
       }
     }
 
@@ -434,7 +619,7 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
       if (_asyncCallRetCode == KeeperException.Code.OK.intValue()) {
         super.asyncDelete(path, cb);
         return;
-      } else {
+      } else if (needRetry(_asyncCallRetCode)) {
         cb.processResult(_asyncCallRetCode, path,
             new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, null, 0, 0, 
false) {
               @Override
@@ -443,6 +628,9 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
                 asyncDelete(path, cb);
               }
             });
+      } else {
+        cb.processResult(_asyncCallRetCode, path,
+            new ZkAsyncCallMonitorContext(_monitor, 0, 0, false));
       }
     }
 

Reply via email to