This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 655fc0913 fix(java-client): fix the failed java-client test (#1836)
655fc0913 is described below

commit 655fc09136d4d1dc2949a949bb537c4db705a92f
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Jan 10 14:16:17 2024 +0800

    fix(java-client): fix the failed java-client test (#1836)
    
    Fix the failed java-client tests, and improve some the code style, 
including:
    - Install `net-tools` which is needed by test tool Toollet.java, it use 
`netstat`
    to find some processes.
    - Improve the flaky test testOperationTimeout.
    - Use an independent hashkey for testScanRangeWithValueExpired to avoid
    conflicts.
    - Improve some code style.
    - Update the README.md.
---
 .github/workflows/lint_and_test_java-client.yml    |   3 +-
 java-client/README.md                              |  51 ++--
 .../java/org/apache/pegasus/client/TestBasic.java  | 270 ++++++++++++---------
 .../pegasus/rpc/async/ReplicaSessionTest.java      |  65 ++---
 .../java/org/apache/pegasus/tools/Toollet.java     |   2 +-
 5 files changed, 215 insertions(+), 176 deletions(-)

diff --git a/.github/workflows/lint_and_test_java-client.yml 
b/.github/workflows/lint_and_test_java-client.yml
index 119877d0e..a11ebb5ee 100644
--- a/.github/workflows/lint_and_test_java-client.yml
+++ b/.github/workflows/lint_and_test_java-client.yml
@@ -94,8 +94,7 @@ jobs:
       - name: Start Pegasus cluster
         run: |
           apt-get update
-          apt-get -y install tree
-          tree ${JAVA_HOME}
+          apt-get install net-tools -y
           export 
LD_LIBRARY_PATH=$(pwd)/thirdparty/output/lib:${JAVA_HOME}/jre/lib/amd64/server
           ulimit -s unlimited
           ./run.sh start_onebox
diff --git a/java-client/README.md b/java-client/README.md
index 44a1cb393..24229cbb8 100644
--- a/java-client/README.md
+++ b/java-client/README.md
@@ -19,43 +19,54 @@ under the License.
 
 # Pegasus Java Client
 
-## Build
+## Development
+
+### 1. Prepare
 
 ```
-cd scripts && sh recompile_thrift.sh && cd -
-mvn spotless:apply
-mvn clean package -DskipTests
+cd scripts && bash recompile_thrift.sh
 ```
 
-## Install
+### 2. Format the code
 
 ```
-cd scripts && sh recompile_thrift.sh && cd -
 mvn spotless:apply
-mvn clean install -DskipTests
 ```
 
-## Test
+### 3. Build
+
+```
+mvn clean package -DskipTests
+```
+
+### 4. Test
 
-To run test, you should start pegasus onebox firstly, and run test as:
+To run test, you should prepare the test environment:
+1. Start [Pegasus onebox](https://pegasus.apache.org/overview/onebox/)
+2. Install some necessary tools, such as `net-tools`
 
 ```
-cd scripts && sh recompile_thrift.sh && cd -
-mvn spotless:apply
 mvn clean package
 ```
 
-or specify one test:
+Or specify one test:
 
 ```
-cd scripts && sh recompile_thrift.sh && cd -
-mvn spotless:apply
 mvn clean package -Dtest=TestPing
 ```
 
-## Configuration
+## Using Pegasus Java client
+
+### Install
+
+```
+cd scripts && bash recompile_thrift.sh && cd -
+mvn clean install -DskipTests
+```
+
+### Configuration
 
-Configure client by "pegasus.properties", for example:
+Configure client by `pegasus.properties` file, for example:
 
 ```
 meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
@@ -66,14 +77,14 @@ perf_counter_tags = k1=v1,k2=v2,k3=v3
 push_counter_interval_secs = 10
 ```
 
-You can provide a parameter of 'configPath' when creating a client instance.
+You can provide a parameter of `configPath` when creating a client instance.
 
-The format of 'configPath' should be one of these:
+The format of `configPath` should be one of these:
 * zk path: zk://host1:port1,host2:port2,host3:port3/path/to/config
 * local file path: file:///path/to/config
 * resource path: resource:///path/to/config
 
-## PerfCounter(Metrics)
+### PerfCounter (Metrics)
 
 Pegasus Java Client supports QPS and latency statistics of requests.
 
@@ -93,7 +104,7 @@ For each type of request(get, set, multiset, etc.), we 
collect 8 metrics:
 5. latency-p50: the moving median of request's queries
 6. latency-p99: the moving p99 of request's queries
 7. lantecy-p999: the moving p999 of request's queries
-8: latency-max: the moving max of request's queries
+8. latency-max: the moving max of request's queries
 
 We use io.dropwizard.metrics library to calculate the request count.
 
diff --git a/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java 
b/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
index d753299ee..c006ab0b8 100644
--- a/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
+++ b/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
@@ -38,9 +38,12 @@ import java.util.Map;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
 
 /** Created by mi on 16-3-22. */
 public class TestBasic {
+  private static final Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestBasic.class);
+
   private static final String basicSetGetDelHashKey = 
"TestBasic_testSetGetDel_hash_key_1";
   private static final String multiSetGetDelHashKey = 
"TestBasic_testMultiSetGetDel_hash_key_1";
   private static final String multiGetHashKey = 
"TestBasic_testMultiGet_hash_key_1";
@@ -2640,7 +2643,7 @@ public class TestBasic {
   @Test // test for making sure return "maxFetchCount" if has "maxFetchCount" 
valid record
   public void testScanRangeWithValueExpired() throws PException, 
InterruptedException {
     String tableName = "temp";
-    String hashKey = "hashKey";
+    String hashKey = "hashKey_testScanRangeWithValueExpired";
     // generate records: 
sortKeys=[expired_0....expired_999,persistent_0...persistent_9]
     generateRecordsWithExpired(tableName, hashKey, 1000, 10);
 
@@ -2649,73 +2652,119 @@ public class TestBasic {
     // case A: scan all record
     // case A1: scan all record: if persistent record count >= maxFetchCount, 
it must return
     // maxFetchCount records
+    int maxFetchCount = 5;
     PegasusTable.ScanRangeResult caseA1 =
-        table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 5, 
0);
-    assertScanResult(0, 4, false, caseA1);
+        table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 
maxFetchCount, 0);
+    assertScanResult(hashKey, 0, 4, false, caseA1);
     // case A2: scan all record: if persistent record count < maxFetchCount, 
it only return
     // persistent count records
+    maxFetchCount = 100;
     PegasusTable.ScanRangeResult caseA2 =
-        table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 
100, 0);
-    assertScanResult(0, 9, true, caseA2);
+        table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 
maxFetchCount, 0);
+    assertScanResult(hashKey, 0, 9, true, caseA2);
 
     // case B: scan limit record by "startSortKey" and "":
     // case B1: scan limit record by "expired_0" and "", if persistent record 
count >=
     // maxFetchCount, it must return maxFetchCount records
+    maxFetchCount = 5;
     PegasusTable.ScanRangeResult caseB1 =
         table.scanRange(
-            hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new 
ScanOptions(), 5, 0);
-    assertScanResult(0, 4, false, caseB1);
+            hashKey.getBytes(),
+            "expired_0".getBytes(),
+            "".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
+    assertScanResult(hashKey, 0, 4, false, caseB1);
     // case B2: scan limit record by "expired_0" and "", if persistent record 
count < maxFetchCount,
     // it only return valid records
+    maxFetchCount = 50;
     PegasusTable.ScanRangeResult caseB2 =
         table.scanRange(
-            hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new 
ScanOptions(), 50, 0);
-    assertScanResult(0, 9, true, caseB2);
+            hashKey.getBytes(),
+            "expired_0".getBytes(),
+            "".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
+    assertScanResult(hashKey, 0, 9, true, caseB2);
     // case B3: scan limit record by "persistent_5" and "", if following 
persistent record count <
     // maxFetchCount, it only return valid records
+    maxFetchCount = 50;
     PegasusTable.ScanRangeResult caseB3 =
         table.scanRange(
-            hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new 
ScanOptions(), 50, 0);
-    assertScanResult(5, 9, true, caseB3);
+            hashKey.getBytes(),
+            "persistent_5".getBytes(),
+            "".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
+    assertScanResult(hashKey, 5, 9, true, caseB3);
     // case B4: scan limit record by "persistent_5" and "", if following 
persistent record count >
     // maxFetchCount, it only return valid records
+    maxFetchCount = 3;
     PegasusTable.ScanRangeResult caseB4 =
         table.scanRange(
-            hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new 
ScanOptions(), 3, 0);
-    assertScanResult(5, 7, false, caseB4);
+            hashKey.getBytes(),
+            "persistent_5".getBytes(),
+            "".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
+    assertScanResult(hashKey, 5, 7, false, caseB4);
 
     // case C: scan limit record by "" and "stopSortKey":
     // case C1: scan limit record by "" and "expired_7", if will return 0 
record
+    maxFetchCount = 3;
     PegasusTable.ScanRangeResult caseC1 =
         table.scanRange(
-            hashKey.getBytes(), "".getBytes(), "expired_7".getBytes(), new 
ScanOptions(), 3, 0);
+            hashKey.getBytes(),
+            "".getBytes(),
+            "expired_7".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
     assertTrue(caseC1.allFetched);
     assertEquals(0, caseC1.results.size()); // among "" and "expired_7" has 0 
valid record
     // case C2: scan limit record by "" and "persistent_7", if valid record 
count < maxFetchCount,
     // it only return valid record
+    maxFetchCount = 10;
     PegasusTable.ScanRangeResult caseC2 =
         table.scanRange(
-            hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new 
ScanOptions(), 10, 0);
-    assertScanResult(0, 6, true, caseC2);
+            hashKey.getBytes(),
+            "".getBytes(),
+            "persistent_7".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
+    assertScanResult(hashKey, 0, 6, true, caseC2);
     // case C3: scan limit record by "" and "persistent_7", if valid record 
count > maxFetchCount,
     // it only return valid record
+    maxFetchCount = 2;
     PegasusTable.ScanRangeResult caseC3 =
         table.scanRange(
-            hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new 
ScanOptions(), 2, 0);
-    assertScanResult(0, 1, false, caseC3);
+            hashKey.getBytes(),
+            "".getBytes(),
+            "persistent_7".getBytes(),
+            new ScanOptions(),
+            maxFetchCount,
+            0);
+    assertScanResult(hashKey, 0, 1, false, caseC3);
 
     // case D: use multiGetSortKeys, which actually equal with case A but no 
value
     // case D1: maxFetchCount > 0, return maxFetchCount valid record
+    maxFetchCount = -1;
     PegasusTableInterface.MultiGetSortKeysResult caseD1 =
-        table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0);
+        table.multiGetSortKeys(hashKey.getBytes(), 5, maxFetchCount, 0);
     assertFalse(caseD1.allFetched);
     assertEquals(5, caseD1.keys.size());
     for (int i = 0; i <= 4; i++) {
       assertEquals("persistent_" + i, new String(caseD1.keys.get(i)));
     }
     // case D1: maxFetchCount < 0, return all valid record
+    maxFetchCount = -1;
     PegasusTableInterface.MultiGetSortKeysResult caseD2 =
-        table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0);
+        table.multiGetSortKeys(hashKey.getBytes(), 10, maxFetchCount, 0);
     assertTrue(caseD2.allFetched);
     assertEquals(10, caseD2.keys.size());
     for (int i = 0; i <= 9; i++) {
@@ -2727,7 +2776,8 @@ public class TestBasic {
       String tableName, String hashKey, int expiredCount, int persistentCount)
       throws PException, InterruptedException {
     PegasusClientInterface client = PegasusClientFactory.getSingletonClient();
-    // assign prefix to make sure the expire record is stored front of 
persistent
+    // Specify the prefixes to make sure the expired records are stored in 
front of the persistent
+    // ones.
     String expiredSortKeyPrefix = "expired_";
     String persistentSortKeyPrefix = "persistent_";
     while (expiredCount-- > 0) {
@@ -2738,7 +2788,7 @@ public class TestBasic {
           (expiredSortKeyPrefix + expiredCount + "_value").getBytes(),
           1);
     }
-    // sleep to make sure the record is expired
+    // Sleep a while to make sure the records are expired.
     Thread.sleep(1000);
     while (persistentCount-- > 0) {
       client.set(
@@ -2751,6 +2801,7 @@ public class TestBasic {
   }
 
   private void assertScanResult(
+      String hashKey,
       int startIndex,
       int stopIndex,
       boolean expectAllFetched,
@@ -2758,8 +2809,7 @@ public class TestBasic {
     assertEquals(expectAllFetched, actuallyRes.allFetched);
     assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size());
     for (int i = startIndex; i <= stopIndex; i++) {
-      assertEquals(
-          "hashKey", new String(actuallyRes.results.get(i - 
startIndex).getLeft().getKey()));
+      assertEquals(hashKey, new String(actuallyRes.results.get(i - 
startIndex).getLeft().getKey()));
       assertEquals(
           "persistent_" + i,
           new String(actuallyRes.results.get(i - 
startIndex).getLeft().getValue()));
@@ -2770,101 +2820,103 @@ public class TestBasic {
   }
 
   @Test
-  public void testRequestDetail() throws PException {
+  public void testOperationTimeout() throws PException {
+    String tableName = "temp";
+
+    // Create a PegasusClientInterface object 'client' which causes exceptions 
throw out.
     Duration caseTimeout = Duration.ofMillis(1);
     ClientOptions client_opt = 
ClientOptions.builder().operationTimeout(caseTimeout).build();
-
-    PegasusClientFactory.createClient(client_opt);
     PegasusClientInterface client = 
PegasusClientFactory.createClient(client_opt);
-    String tableName = "temp";
-    PegasusTableInterface tb = client.openTable(tableName);
 
-    String HashPrefix = "TestHash";
-    String SortPrefix = "TestSort";
-    String hashKey = HashPrefix + "_0";
-    String sortKey = SortPrefix + "_0";
+    // Create a PegasusTableInterface object 'tb' which doesn't cause 
exceptions throw out.
+    PegasusClientInterface client2 = 
PegasusClientFactory.createClient(ClientOptions.create());
+    PegasusTableInterface tb = client2.openTable(tableName);
+
+    String hashKey = "TestHash_0";
+    String sortKey = "TestSort_0";
 
     try {
-      // multiSet timeout
-      System.out.println("Test multiSet PException request");
-
-      String multiValue2 = RandomStringUtils.random(5, true, true);
-      List<Pair<byte[], byte[]>> multiValues2 = new ArrayList<Pair<byte[], 
byte[]>>();
-      int count2 = 500;
-      while (count2-- > 0) {
-        multiValues2.add(Pair.of(sortKey.getBytes(), multiValue2.getBytes()));
+      logger.info("Test multiSet PException request");
+      {
+        String value = RandomStringUtils.random(5, true, true);
+        List<Pair<byte[], byte[]>> values = new ArrayList<Pair<byte[], 
byte[]>>();
+        int count = 500;
+        while (count-- > 0) {
+          values.add(Pair.of(sortKey.getBytes(), value.getBytes()));
+        }
+
+        Throwable exception =
+            assertThrows(
+                PException.class,
+                () -> {
+                  client.multiSet(tableName, hashKey.getBytes(), values);
+                });
+        assertTrue(
+            exception
+                .getMessage()
+                .contains(
+                    
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=500,valueLength=2500]"));
       }
 
-      Throwable exception =
-          assertThrows(
-              PException.class,
-              () -> {
-                client.multiSet(tableName, hashKey.getBytes(), multiValues2);
-              });
-      assertTrue(
-          exception
-              .getMessage()
-              .contains(
-                  
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=500,valueLength=2500]"));
-
-      // checkAndMutate timeout
-      System.out.println("Test checkAndMutate PException request");
-      Mutations mutations = new Mutations();
-      mutations.set(sortKey.getBytes(), "2".getBytes());
-
-      CheckAndMutateOptions options = new CheckAndMutateOptions();
-      options.returnCheckValue = true;
-      Throwable exception2 =
-          assertThrows(
-              PException.class,
-              () -> {
-                client.checkAndMutate(
-                    tableName,
-                    hashKey.getBytes(),
-                    "k5".getBytes(),
-                    CheckType.CT_VALUE_INT_LESS,
-                    "2".getBytes(),
-                    mutations,
-                    options);
-              });
-      assertTrue(
-          exception2
-              .getMessage()
-              .contains(
-                  
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"k5\",sortKeyCount=1,valueLength=1]"));
-
-      // multiDel timeout
-      System.out.println("Test multiDel PException request");
-      List<Pair<byte[], byte[]>> multiValues3 = new ArrayList<Pair<byte[], 
byte[]>>();
-      List<byte[]> sortKeys = new ArrayList<byte[]>();
-      multiValues3.add(
-          Pair.of("basic_test_sort_key_0".getBytes(), 
"basic_test_value_0".getBytes()));
-      multiValues3.add(
-          Pair.of("basic_test_sort_key_1".getBytes(), 
"basic_test_value_1".getBytes()));
-      multiValues3.add(
-          Pair.of("basic_test_sort_key_2".getBytes(), 
"basic_test_value_2".getBytes()));
-      sortKeys.add("basic_test_sort_key_0".getBytes());
-      sortKeys.add("basic_test_sort_key_1".getBytes());
-      sortKeys.add("basic_test_sort_key_2".getBytes());
+      logger.info("Test checkAndMutate PException request");
+      {
+        Mutations mutations = new Mutations();
+        mutations.set(sortKey.getBytes(), "2".getBytes());
 
-      tb.multiSet(hashKey.getBytes(), multiValues3, 5000);
-      assertDoesNotThrow(
-          () -> {
-            tb.multiSet(hashKey.getBytes(), multiValues3, 5000);
-          });
-
-      Throwable exception3 =
-          assertThrows(
-              PException.class,
-              () -> {
-                client.multiDel(tableName, hashKey.getBytes(), sortKeys);
-              });
-      assertTrue(
-          exception3
-              .getMessage()
-              .contains(
-                  
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=3,valueLength=-1]"));
+        CheckAndMutateOptions options = new CheckAndMutateOptions();
+        options.returnCheckValue = true;
+        Throwable exception =
+            assertThrows(
+                PException.class,
+                () -> {
+                  client.checkAndMutate(
+                      tableName,
+                      hashKey.getBytes(),
+                      "k5".getBytes(),
+                      CheckType.CT_VALUE_INT_LESS,
+                      "2".getBytes(),
+                      mutations,
+                      options);
+                });
+        assertTrue(
+            exception
+                .getMessage()
+                .contains(
+                    
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"k5\",sortKeyCount=1,valueLength=1]"));
+      }
+
+      logger.info("Test multiDel PException request");
+      {
+        List<byte[]> sortKeys = new ArrayList<byte[]>();
+        List<Pair<byte[], byte[]>> values = new ArrayList<Pair<byte[], 
byte[]>>();
+        int count = 500;
+        while (count-- > 0) {
+          String tempSortKey = "basic_test_sort_key_0";
+          sortKeys.add(tempSortKey.getBytes());
+          values.add(Pair.of(tempSortKey.getBytes(), 
"basic_test_value_0".getBytes()));
+        }
+
+        // Expect multiSet with a higher timeout will not throw exception.
+        assertDoesNotThrow(
+            () -> {
+              tb.multiSet(hashKey.getBytes(), values, 5000);
+            });
+
+        // Expect multiSet with a lower timeout will throw exception.
+        Throwable exception =
+            assertThrows(
+                PException.class,
+                () -> {
+                  client.multiDel(tableName, hashKey.getBytes(), sortKeys);
+                });
+        assertTrue(
+            exception
+                .getMessage()
+                .contains(
+                    
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=500,valueLength=-1]"));
+      }
     } catch (Throwable e) {
+      e.printStackTrace();
       fail();
     }
   }
diff --git 
a/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
 
b/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
index 66603f052..d33605de4 100644
--- 
a/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
+++ 
b/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
@@ -45,7 +45,6 @@ import 
org.apache.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
 import org.apache.pegasus.tools.Toollet;
 import org.apache.pegasus.tools.Tools;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TMessage;
 import org.apache.thrift.protocol.TProtocol;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -71,7 +70,7 @@ public class ReplicaSessionTest {
   /** Method: connect() */
   @Test
   public void testConnect() throws Exception {
-    // test1: connect to a invalid address
+    // test1: connect to an invalid address.
     rpc_address addr = new rpc_address();
     addr.fromString("127.0.0.1:12345");
     ReplicaSession rs = manager.getReplicaSession(addr);
@@ -82,12 +81,9 @@ public class ReplicaSessionTest {
       final client_operator op = new rrdb_put_operator(new gpid(-1, -1), "", 
null, 0);
       final FutureTask<Void> cb =
           new FutureTask<Void>(
-              new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                  assertEquals(error_code.error_types.ERR_SESSION_RESET, 
op.rpc_error.errno);
-                  return null;
-                }
+              () -> {
+                assertEquals(error_code.error_types.ERR_SESSION_RESET, 
op.rpc_error.errno);
+                return null;
               });
 
       callbacks.add(cb);
@@ -103,29 +99,16 @@ public class ReplicaSessionTest {
     }
 
     final ReplicaSession cp_rs = rs;
-    Toollet.waitCondition(
-        new Toollet.BoolCallable() {
-          @Override
-          public boolean call() {
-            return ReplicaSession.ConnState.DISCONNECTED == cp_rs.getState();
-          }
-        },
-        5);
-
-    // test2: connect to an valid address, and then close the server
+    Toollet.waitCondition(() -> ReplicaSession.ConnState.DISCONNECTED == 
cp_rs.getState(), 5);
+
+    // test2: connect to a valid address, and then close the server.
     addr.fromString("127.0.0.1:34801");
     callbacks.clear();
 
     rs = manager.getReplicaSession(addr);
-    rs.setMessageResponseFilter(
-        new ReplicaSession.MessageResponseFilter() {
-          @Override
-          public boolean abandonIt(error_code.error_types err, TMessage 
header) {
-            return true;
-          }
-        });
+    rs.setMessageResponseFilter((err, header) -> true);
     for (int i = 0; i < 20; ++i) {
-      // we send query request to replica server. We expect it to discard it.
+      // Send query request to replica server, expect it to be timeout.
       final int index = i;
       update_request req =
           new update_request(new blob("hello".getBytes()), new 
blob("world".getBytes()), 0);
@@ -134,16 +117,13 @@ public class ReplicaSessionTest {
       final rpc_address cp_addr = addr;
       final FutureTask<Void> cb =
           new FutureTask<Void>(
-              new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                  assertEquals(error_code.error_types.ERR_TIMEOUT, 
op.rpc_error.errno);
-                  // for the last request, we kill the server
-                  if (index == 19) {
-                    Toollet.closeServer(cp_addr);
-                  }
-                  return null;
+              () -> {
+                assertEquals(error_code.error_types.ERR_TIMEOUT, 
op.rpc_error.errno);
+                // for the last request, we kill the server
+                if (index == 19) {
+                  Toollet.closeServer(cp_addr);
                 }
+                return null;
               });
 
       callbacks.add(cb);
@@ -151,23 +131,20 @@ public class ReplicaSessionTest {
     }
 
     for (int i = 0; i < 80; ++i) {
-      // then we still send query request to replica server. But the timeout 
is longer.
+      // Re-send query request to replica server, but the timeout is longer.
       update_request req =
           new update_request(new blob("hello".getBytes()), new 
blob("world".getBytes()), 0);
       final client_operator op = new Toollet.test_operator(new gpid(-1, -1), 
req);
       final FutureTask<Void> cb =
           new FutureTask<Void>(
-              new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                  assertEquals(error_code.error_types.ERR_SESSION_RESET, 
op.rpc_error.errno);
-                  return null;
-                }
+              () -> {
+                assertEquals(error_code.error_types.ERR_SESSION_RESET, 
op.rpc_error.errno);
+                return null;
               });
 
       callbacks.add(cb);
-      // these requests have longer timeout, so they should be responsed later 
than the server is
-      // killed
+      // The request has longer timeout, so it should be responsed later than 
the server been
+      // killed.
       rs.asyncSend(op, cb, 2000, false);
     }
 
diff --git a/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java 
b/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
index 20b39b8ca..fb0c53b2e 100644
--- a/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
+++ b/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
@@ -82,7 +82,7 @@ public class Toollet {
       Process process = Runtime.getRuntime().exec(new String[] {"bash", "-c", 
command});
       int result = process.waitFor();
       if (0 != result) {
-        logger.warn("exec command {} failed, error code {}", command, result);
+        logger.warn("exec command '{}' failed, error code {}", command, 
result);
         return false;
       }
       return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to