This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 655fc0913 fix(java-client): fix the failed java-client test (#1836)
655fc0913 is described below
commit 655fc09136d4d1dc2949a949bb537c4db705a92f
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Jan 10 14:16:17 2024 +0800
fix(java-client): fix the failed java-client test (#1836)
Fix the failed java-client tests, and improve some the code style,
including:
- Install `net-tools` which is needed by test tool Toollet.java, it use
`netstat`
to find some processes.
- Improve the flaky test testOperationTimeout.
- Use an independent hashkey for testScanRangeWithValueExpired to avoid
conflicts.
- Improve some code style.
- Update the README.md.
---
.github/workflows/lint_and_test_java-client.yml | 3 +-
java-client/README.md | 51 ++--
.../java/org/apache/pegasus/client/TestBasic.java | 270 ++++++++++++---------
.../pegasus/rpc/async/ReplicaSessionTest.java | 65 ++---
.../java/org/apache/pegasus/tools/Toollet.java | 2 +-
5 files changed, 215 insertions(+), 176 deletions(-)
diff --git a/.github/workflows/lint_and_test_java-client.yml
b/.github/workflows/lint_and_test_java-client.yml
index 119877d0e..a11ebb5ee 100644
--- a/.github/workflows/lint_and_test_java-client.yml
+++ b/.github/workflows/lint_and_test_java-client.yml
@@ -94,8 +94,7 @@ jobs:
- name: Start Pegasus cluster
run: |
apt-get update
- apt-get -y install tree
- tree ${JAVA_HOME}
+ apt-get install net-tools -y
export
LD_LIBRARY_PATH=$(pwd)/thirdparty/output/lib:${JAVA_HOME}/jre/lib/amd64/server
ulimit -s unlimited
./run.sh start_onebox
diff --git a/java-client/README.md b/java-client/README.md
index 44a1cb393..24229cbb8 100644
--- a/java-client/README.md
+++ b/java-client/README.md
@@ -19,43 +19,54 @@ under the License.
# Pegasus Java Client
-## Build
+## Development
+
+### 1. Prepare
```
-cd scripts && sh recompile_thrift.sh && cd -
-mvn spotless:apply
-mvn clean package -DskipTests
+cd scripts && bash recompile_thrift.sh
```
-## Install
+### 2. Format the code
```
-cd scripts && sh recompile_thrift.sh && cd -
mvn spotless:apply
-mvn clean install -DskipTests
```
-## Test
+### 3. Build
+
+```
+mvn clean package -DskipTests
+```
+
+### 4. Test
-To run test, you should start pegasus onebox firstly, and run test as:
+To run test, you should prepare the test environment:
+1. Start [Pegasus onebox](https://pegasus.apache.org/overview/onebox/)
+2. Install some necessary tools, such as `net-tools`
```
-cd scripts && sh recompile_thrift.sh && cd -
-mvn spotless:apply
mvn clean package
```
-or specify one test:
+Or specify one test:
```
-cd scripts && sh recompile_thrift.sh && cd -
-mvn spotless:apply
mvn clean package -Dtest=TestPing
```
-## Configuration
+## Using Pegasus Java client
+
+### Install
+
+```
+cd scripts && bash recompile_thrift.sh && cd -
+mvn clean install -DskipTests
+```
+
+### Configuration
-Configure client by "pegasus.properties", for example:
+Configure client by `pegasus.properties` file, for example:
```
meta_servers = 127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603
@@ -66,14 +77,14 @@ perf_counter_tags = k1=v1,k2=v2,k3=v3
push_counter_interval_secs = 10
```
-You can provide a parameter of 'configPath' when creating a client instance.
+You can provide a parameter of `configPath` when creating a client instance.
-The format of 'configPath' should be one of these:
+The format of `configPath` should be one of these:
* zk path: zk://host1:port1,host2:port2,host3:port3/path/to/config
* local file path: file:///path/to/config
* resource path: resource:///path/to/config
-## PerfCounter(Metrics)
+### PerfCounter (Metrics)
Pegasus Java Client supports QPS and latency statistics of requests.
@@ -93,7 +104,7 @@ For each type of request(get, set, multiset, etc.), we
collect 8 metrics:
5. latency-p50: the moving median of request's queries
6. latency-p99: the moving p99 of request's queries
7. lantecy-p999: the moving p999 of request's queries
-8: latency-max: the moving max of request's queries
+8. latency-max: the moving max of request's queries
We use io.dropwizard.metrics library to calculate the request count.
diff --git a/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
b/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
index d753299ee..c006ab0b8 100644
--- a/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
+++ b/java-client/src/test/java/org/apache/pegasus/client/TestBasic.java
@@ -38,9 +38,12 @@ import java.util.Map;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
/** Created by mi on 16-3-22. */
public class TestBasic {
+ private static final Logger logger =
org.slf4j.LoggerFactory.getLogger(TestBasic.class);
+
private static final String basicSetGetDelHashKey =
"TestBasic_testSetGetDel_hash_key_1";
private static final String multiSetGetDelHashKey =
"TestBasic_testMultiSetGetDel_hash_key_1";
private static final String multiGetHashKey =
"TestBasic_testMultiGet_hash_key_1";
@@ -2640,7 +2643,7 @@ public class TestBasic {
@Test // test for making sure return "maxFetchCount" if has "maxFetchCount"
valid record
public void testScanRangeWithValueExpired() throws PException,
InterruptedException {
String tableName = "temp";
- String hashKey = "hashKey";
+ String hashKey = "hashKey_testScanRangeWithValueExpired";
// generate records:
sortKeys=[expired_0....expired_999,persistent_0...persistent_9]
generateRecordsWithExpired(tableName, hashKey, 1000, 10);
@@ -2649,73 +2652,119 @@ public class TestBasic {
// case A: scan all record
// case A1: scan all record: if persistent record count >= maxFetchCount,
it must return
// maxFetchCount records
+ int maxFetchCount = 5;
PegasusTable.ScanRangeResult caseA1 =
- table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(), 5,
0);
- assertScanResult(0, 4, false, caseA1);
+ table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(),
maxFetchCount, 0);
+ assertScanResult(hashKey, 0, 4, false, caseA1);
// case A2: scan all record: if persistent record count < maxFetchCount,
it only return
// persistent count records
+ maxFetchCount = 100;
PegasusTable.ScanRangeResult caseA2 =
- table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(),
100, 0);
- assertScanResult(0, 9, true, caseA2);
+ table.scanRange(hashKey.getBytes(), null, null, new ScanOptions(),
maxFetchCount, 0);
+ assertScanResult(hashKey, 0, 9, true, caseA2);
// case B: scan limit record by "startSortKey" and "":
// case B1: scan limit record by "expired_0" and "", if persistent record
count >=
// maxFetchCount, it must return maxFetchCount records
+ maxFetchCount = 5;
PegasusTable.ScanRangeResult caseB1 =
table.scanRange(
- hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new
ScanOptions(), 5, 0);
- assertScanResult(0, 4, false, caseB1);
+ hashKey.getBytes(),
+ "expired_0".getBytes(),
+ "".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
+ assertScanResult(hashKey, 0, 4, false, caseB1);
// case B2: scan limit record by "expired_0" and "", if persistent record
count < maxFetchCount,
// it only return valid records
+ maxFetchCount = 50;
PegasusTable.ScanRangeResult caseB2 =
table.scanRange(
- hashKey.getBytes(), "expired_0".getBytes(), "".getBytes(), new
ScanOptions(), 50, 0);
- assertScanResult(0, 9, true, caseB2);
+ hashKey.getBytes(),
+ "expired_0".getBytes(),
+ "".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
+ assertScanResult(hashKey, 0, 9, true, caseB2);
// case B3: scan limit record by "persistent_5" and "", if following
persistent record count <
// maxFetchCount, it only return valid records
+ maxFetchCount = 50;
PegasusTable.ScanRangeResult caseB3 =
table.scanRange(
- hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new
ScanOptions(), 50, 0);
- assertScanResult(5, 9, true, caseB3);
+ hashKey.getBytes(),
+ "persistent_5".getBytes(),
+ "".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
+ assertScanResult(hashKey, 5, 9, true, caseB3);
// case B4: scan limit record by "persistent_5" and "", if following
persistent record count >
// maxFetchCount, it only return valid records
+ maxFetchCount = 3;
PegasusTable.ScanRangeResult caseB4 =
table.scanRange(
- hashKey.getBytes(), "persistent_5".getBytes(), "".getBytes(), new
ScanOptions(), 3, 0);
- assertScanResult(5, 7, false, caseB4);
+ hashKey.getBytes(),
+ "persistent_5".getBytes(),
+ "".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
+ assertScanResult(hashKey, 5, 7, false, caseB4);
// case C: scan limit record by "" and "stopSortKey":
// case C1: scan limit record by "" and "expired_7", if will return 0
record
+ maxFetchCount = 3;
PegasusTable.ScanRangeResult caseC1 =
table.scanRange(
- hashKey.getBytes(), "".getBytes(), "expired_7".getBytes(), new
ScanOptions(), 3, 0);
+ hashKey.getBytes(),
+ "".getBytes(),
+ "expired_7".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
assertTrue(caseC1.allFetched);
assertEquals(0, caseC1.results.size()); // among "" and "expired_7" has 0
valid record
// case C2: scan limit record by "" and "persistent_7", if valid record
count < maxFetchCount,
// it only return valid record
+ maxFetchCount = 10;
PegasusTable.ScanRangeResult caseC2 =
table.scanRange(
- hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new
ScanOptions(), 10, 0);
- assertScanResult(0, 6, true, caseC2);
+ hashKey.getBytes(),
+ "".getBytes(),
+ "persistent_7".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
+ assertScanResult(hashKey, 0, 6, true, caseC2);
// case C3: scan limit record by "" and "persistent_7", if valid record
count > maxFetchCount,
// it only return valid record
+ maxFetchCount = 2;
PegasusTable.ScanRangeResult caseC3 =
table.scanRange(
- hashKey.getBytes(), "".getBytes(), "persistent_7".getBytes(), new
ScanOptions(), 2, 0);
- assertScanResult(0, 1, false, caseC3);
+ hashKey.getBytes(),
+ "".getBytes(),
+ "persistent_7".getBytes(),
+ new ScanOptions(),
+ maxFetchCount,
+ 0);
+ assertScanResult(hashKey, 0, 1, false, caseC3);
// case D: use multiGetSortKeys, which actually equal with case A but no
value
// case D1: maxFetchCount > 0, return maxFetchCount valid record
+ maxFetchCount = -1;
PegasusTableInterface.MultiGetSortKeysResult caseD1 =
- table.multiGetSortKeys(hashKey.getBytes(), 5, -1, 0);
+ table.multiGetSortKeys(hashKey.getBytes(), 5, maxFetchCount, 0);
assertFalse(caseD1.allFetched);
assertEquals(5, caseD1.keys.size());
for (int i = 0; i <= 4; i++) {
assertEquals("persistent_" + i, new String(caseD1.keys.get(i)));
}
// case D1: maxFetchCount < 0, return all valid record
+ maxFetchCount = -1;
PegasusTableInterface.MultiGetSortKeysResult caseD2 =
- table.multiGetSortKeys(hashKey.getBytes(), 10, -1, 0);
+ table.multiGetSortKeys(hashKey.getBytes(), 10, maxFetchCount, 0);
assertTrue(caseD2.allFetched);
assertEquals(10, caseD2.keys.size());
for (int i = 0; i <= 9; i++) {
@@ -2727,7 +2776,8 @@ public class TestBasic {
String tableName, String hashKey, int expiredCount, int persistentCount)
throws PException, InterruptedException {
PegasusClientInterface client = PegasusClientFactory.getSingletonClient();
- // assign prefix to make sure the expire record is stored front of
persistent
+ // Specify the prefixes to make sure the expired records are stored in
front of the persistent
+ // ones.
String expiredSortKeyPrefix = "expired_";
String persistentSortKeyPrefix = "persistent_";
while (expiredCount-- > 0) {
@@ -2738,7 +2788,7 @@ public class TestBasic {
(expiredSortKeyPrefix + expiredCount + "_value").getBytes(),
1);
}
- // sleep to make sure the record is expired
+ // Sleep a while to make sure the records are expired.
Thread.sleep(1000);
while (persistentCount-- > 0) {
client.set(
@@ -2751,6 +2801,7 @@ public class TestBasic {
}
private void assertScanResult(
+ String hashKey,
int startIndex,
int stopIndex,
boolean expectAllFetched,
@@ -2758,8 +2809,7 @@ public class TestBasic {
assertEquals(expectAllFetched, actuallyRes.allFetched);
assertEquals(stopIndex - startIndex + 1, actuallyRes.results.size());
for (int i = startIndex; i <= stopIndex; i++) {
- assertEquals(
- "hashKey", new String(actuallyRes.results.get(i -
startIndex).getLeft().getKey()));
+ assertEquals(hashKey, new String(actuallyRes.results.get(i -
startIndex).getLeft().getKey()));
assertEquals(
"persistent_" + i,
new String(actuallyRes.results.get(i -
startIndex).getLeft().getValue()));
@@ -2770,101 +2820,103 @@ public class TestBasic {
}
@Test
- public void testRequestDetail() throws PException {
+ public void testOperationTimeout() throws PException {
+ String tableName = "temp";
+
+ // Create a PegasusClientInterface object 'client' which causes exceptions
throw out.
Duration caseTimeout = Duration.ofMillis(1);
ClientOptions client_opt =
ClientOptions.builder().operationTimeout(caseTimeout).build();
-
- PegasusClientFactory.createClient(client_opt);
PegasusClientInterface client =
PegasusClientFactory.createClient(client_opt);
- String tableName = "temp";
- PegasusTableInterface tb = client.openTable(tableName);
- String HashPrefix = "TestHash";
- String SortPrefix = "TestSort";
- String hashKey = HashPrefix + "_0";
- String sortKey = SortPrefix + "_0";
+ // Create a PegasusTableInterface object 'tb' which doesn't cause
exceptions throw out.
+ PegasusClientInterface client2 =
PegasusClientFactory.createClient(ClientOptions.create());
+ PegasusTableInterface tb = client2.openTable(tableName);
+
+ String hashKey = "TestHash_0";
+ String sortKey = "TestSort_0";
try {
- // multiSet timeout
- System.out.println("Test multiSet PException request");
-
- String multiValue2 = RandomStringUtils.random(5, true, true);
- List<Pair<byte[], byte[]>> multiValues2 = new ArrayList<Pair<byte[],
byte[]>>();
- int count2 = 500;
- while (count2-- > 0) {
- multiValues2.add(Pair.of(sortKey.getBytes(), multiValue2.getBytes()));
+ logger.info("Test multiSet PException request");
+ {
+ String value = RandomStringUtils.random(5, true, true);
+ List<Pair<byte[], byte[]>> values = new ArrayList<Pair<byte[],
byte[]>>();
+ int count = 500;
+ while (count-- > 0) {
+ values.add(Pair.of(sortKey.getBytes(), value.getBytes()));
+ }
+
+ Throwable exception =
+ assertThrows(
+ PException.class,
+ () -> {
+ client.multiSet(tableName, hashKey.getBytes(), values);
+ });
+ assertTrue(
+ exception
+ .getMessage()
+ .contains(
+
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=500,valueLength=2500]"));
}
- Throwable exception =
- assertThrows(
- PException.class,
- () -> {
- client.multiSet(tableName, hashKey.getBytes(), multiValues2);
- });
- assertTrue(
- exception
- .getMessage()
- .contains(
-
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=500,valueLength=2500]"));
-
- // checkAndMutate timeout
- System.out.println("Test checkAndMutate PException request");
- Mutations mutations = new Mutations();
- mutations.set(sortKey.getBytes(), "2".getBytes());
-
- CheckAndMutateOptions options = new CheckAndMutateOptions();
- options.returnCheckValue = true;
- Throwable exception2 =
- assertThrows(
- PException.class,
- () -> {
- client.checkAndMutate(
- tableName,
- hashKey.getBytes(),
- "k5".getBytes(),
- CheckType.CT_VALUE_INT_LESS,
- "2".getBytes(),
- mutations,
- options);
- });
- assertTrue(
- exception2
- .getMessage()
- .contains(
-
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"k5\",sortKeyCount=1,valueLength=1]"));
-
- // multiDel timeout
- System.out.println("Test multiDel PException request");
- List<Pair<byte[], byte[]>> multiValues3 = new ArrayList<Pair<byte[],
byte[]>>();
- List<byte[]> sortKeys = new ArrayList<byte[]>();
- multiValues3.add(
- Pair.of("basic_test_sort_key_0".getBytes(),
"basic_test_value_0".getBytes()));
- multiValues3.add(
- Pair.of("basic_test_sort_key_1".getBytes(),
"basic_test_value_1".getBytes()));
- multiValues3.add(
- Pair.of("basic_test_sort_key_2".getBytes(),
"basic_test_value_2".getBytes()));
- sortKeys.add("basic_test_sort_key_0".getBytes());
- sortKeys.add("basic_test_sort_key_1".getBytes());
- sortKeys.add("basic_test_sort_key_2".getBytes());
+ logger.info("Test checkAndMutate PException request");
+ {
+ Mutations mutations = new Mutations();
+ mutations.set(sortKey.getBytes(), "2".getBytes());
- tb.multiSet(hashKey.getBytes(), multiValues3, 5000);
- assertDoesNotThrow(
- () -> {
- tb.multiSet(hashKey.getBytes(), multiValues3, 5000);
- });
-
- Throwable exception3 =
- assertThrows(
- PException.class,
- () -> {
- client.multiDel(tableName, hashKey.getBytes(), sortKeys);
- });
- assertTrue(
- exception3
- .getMessage()
- .contains(
-
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=3,valueLength=-1]"));
+ CheckAndMutateOptions options = new CheckAndMutateOptions();
+ options.returnCheckValue = true;
+ Throwable exception =
+ assertThrows(
+ PException.class,
+ () -> {
+ client.checkAndMutate(
+ tableName,
+ hashKey.getBytes(),
+ "k5".getBytes(),
+ CheckType.CT_VALUE_INT_LESS,
+ "2".getBytes(),
+ mutations,
+ options);
+ });
+ assertTrue(
+ exception
+ .getMessage()
+ .contains(
+
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"k5\",sortKeyCount=1,valueLength=1]"));
+ }
+
+ logger.info("Test multiDel PException request");
+ {
+ List<byte[]> sortKeys = new ArrayList<byte[]>();
+ List<Pair<byte[], byte[]>> values = new ArrayList<Pair<byte[],
byte[]>>();
+ int count = 500;
+ while (count-- > 0) {
+ String tempSortKey = "basic_test_sort_key_0";
+ sortKeys.add(tempSortKey.getBytes());
+ values.add(Pair.of(tempSortKey.getBytes(),
"basic_test_value_0".getBytes()));
+ }
+
+ // Expect multiSet with a higher timeout will not throw exception.
+ assertDoesNotThrow(
+ () -> {
+ tb.multiSet(hashKey.getBytes(), values, 5000);
+ });
+
+ // Expect multiSet with a lower timeout will throw exception.
+ Throwable exception =
+ assertThrows(
+ PException.class,
+ () -> {
+ client.multiDel(tableName, hashKey.getBytes(), sortKeys);
+ });
+ assertTrue(
+ exception
+ .getMessage()
+ .contains(
+
"request=[hashKey[:32]=\"TestHash_0\",sortKey[:32]=\"\",sortKeyCount=500,valueLength=-1]"));
+ }
} catch (Throwable e) {
+ e.printStackTrace();
fail();
}
}
diff --git
a/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
b/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
index 66603f052..d33605de4 100644
---
a/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
+++
b/java-client/src/test/java/org/apache/pegasus/rpc/async/ReplicaSessionTest.java
@@ -45,7 +45,6 @@ import
org.apache.pegasus.rpc.interceptor.ReplicaSessionInterceptorManager;
import org.apache.pegasus.tools.Toollet;
import org.apache.pegasus.tools.Tools;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TProtocol;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -71,7 +70,7 @@ public class ReplicaSessionTest {
/** Method: connect() */
@Test
public void testConnect() throws Exception {
- // test1: connect to a invalid address
+ // test1: connect to an invalid address.
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:12345");
ReplicaSession rs = manager.getReplicaSession(addr);
@@ -82,12 +81,9 @@ public class ReplicaSessionTest {
final client_operator op = new rrdb_put_operator(new gpid(-1, -1), "",
null, 0);
final FutureTask<Void> cb =
new FutureTask<Void>(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- assertEquals(error_code.error_types.ERR_SESSION_RESET,
op.rpc_error.errno);
- return null;
- }
+ () -> {
+ assertEquals(error_code.error_types.ERR_SESSION_RESET,
op.rpc_error.errno);
+ return null;
});
callbacks.add(cb);
@@ -103,29 +99,16 @@ public class ReplicaSessionTest {
}
final ReplicaSession cp_rs = rs;
- Toollet.waitCondition(
- new Toollet.BoolCallable() {
- @Override
- public boolean call() {
- return ReplicaSession.ConnState.DISCONNECTED == cp_rs.getState();
- }
- },
- 5);
-
- // test2: connect to an valid address, and then close the server
+ Toollet.waitCondition(() -> ReplicaSession.ConnState.DISCONNECTED ==
cp_rs.getState(), 5);
+
+ // test2: connect to a valid address, and then close the server.
addr.fromString("127.0.0.1:34801");
callbacks.clear();
rs = manager.getReplicaSession(addr);
- rs.setMessageResponseFilter(
- new ReplicaSession.MessageResponseFilter() {
- @Override
- public boolean abandonIt(error_code.error_types err, TMessage
header) {
- return true;
- }
- });
+ rs.setMessageResponseFilter((err, header) -> true);
for (int i = 0; i < 20; ++i) {
- // we send query request to replica server. We expect it to discard it.
+ // Send query request to replica server, expect it to be timeout.
final int index = i;
update_request req =
new update_request(new blob("hello".getBytes()), new
blob("world".getBytes()), 0);
@@ -134,16 +117,13 @@ public class ReplicaSessionTest {
final rpc_address cp_addr = addr;
final FutureTask<Void> cb =
new FutureTask<Void>(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- assertEquals(error_code.error_types.ERR_TIMEOUT,
op.rpc_error.errno);
- // for the last request, we kill the server
- if (index == 19) {
- Toollet.closeServer(cp_addr);
- }
- return null;
+ () -> {
+ assertEquals(error_code.error_types.ERR_TIMEOUT,
op.rpc_error.errno);
+ // for the last request, we kill the server
+ if (index == 19) {
+ Toollet.closeServer(cp_addr);
}
+ return null;
});
callbacks.add(cb);
@@ -151,23 +131,20 @@ public class ReplicaSessionTest {
}
for (int i = 0; i < 80; ++i) {
- // then we still send query request to replica server. But the timeout
is longer.
+ // Re-send query request to replica server, but the timeout is longer.
update_request req =
new update_request(new blob("hello".getBytes()), new
blob("world".getBytes()), 0);
final client_operator op = new Toollet.test_operator(new gpid(-1, -1),
req);
final FutureTask<Void> cb =
new FutureTask<Void>(
- new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- assertEquals(error_code.error_types.ERR_SESSION_RESET,
op.rpc_error.errno);
- return null;
- }
+ () -> {
+ assertEquals(error_code.error_types.ERR_SESSION_RESET,
op.rpc_error.errno);
+ return null;
});
callbacks.add(cb);
- // these requests have longer timeout, so they should be responsed later
than the server is
- // killed
+ // The request has longer timeout, so it should be responsed later than
the server been
+ // killed.
rs.asyncSend(op, cb, 2000, false);
}
diff --git a/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
b/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
index 20b39b8ca..fb0c53b2e 100644
--- a/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
+++ b/java-client/src/test/java/org/apache/pegasus/tools/Toollet.java
@@ -82,7 +82,7 @@ public class Toollet {
Process process = Runtime.getRuntime().exec(new String[] {"bash", "-c",
command});
int result = process.waitFor();
if (0 != result) {
- logger.warn("exec command {} failed, error code {}", command, result);
+ logger.warn("exec command '{}' failed, error code {}", command,
result);
return false;
}
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]