This is an automated email from the ASF dual-hosted git repository.

ming pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new 89a8f9b1 refact(core): adaptor for common 1.2 & fix a string of 
possible CI problem (#286)
89a8f9b1 is described below

commit 89a8f9b1f78ea78b8b9f29ad64ba5aeafff3bfac
Author: imbajin <[email protected]>
AuthorDate: Mon Dec 11 15:07:38 2023 +0800

    refact(core): adaptor for common 1.2 & fix a string of possible CI problem 
(#286)
    
    * chore(ci): update common 1.2 & upgrade action version
    
    * replace HTTP_CODE
    
    * Update codeql-analysis.yml
    
    * Update pom.xml
    
    * use docker image for server
    
    * update webmockserver version to adapt okhttp4.x
    
    * remove useless client
    
    * Revert "remove useless client"
    
    This reverts commit 3e894d34e31ce2a9e0366c0c8dd7cb66af6cc7cb.
    
    * chore: replace loading data with docker & fix AssertThrows
    
    * update loader image name
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * change path
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * fix name
    
    * Update load-data-into-hugegraph.sh
    
    * update ci logic
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update load-data-into-hugegraph.sh
    
    * Update WorkerServiceTest.java
    
    * update data path
    
    * refactor WorkerServer
    
    * update  the WokerService init & close logic
    
    * Update MasterService.java
    
    * CI passed & revert the ignore
    
    * Update MessageRecvManagerTest.java
    
    * enable 3rd party check
    
    * Update license-checker.yml
    
    * Update 
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
    
    Co-authored-by: Cong Zhao <[email protected]>
    
    * Update MasterService.java
    
    * tiny fix
    
    ---------
    
    Co-authored-by: Cong Zhao <[email protected]>
---
 .github/configs/settings.xml                       |  60 ++++++
 .github/workflows/ci.yml                           |  30 +--
 .github/workflows/codeql-analysis.yml              |  70 ++++---
 .github/workflows/license-checker.yml              |  50 ++---
 .github/workflows/stale.yml                        |   2 +-
 checkstyle.xml                                     |   6 +-
 .../hugegraph/computer/core/common/Constants.java  |   2 +-
 .../hugegraph/computer/core/bsp/EtcdClient.java    |  93 ++++-----
 .../computer/core/master/MasterService.java        |  42 ++--
 .../core/network/session/ClientSession.java        |  10 +-
 .../computer/core/worker/WorkerService.java        | 187 +++++++++---------
 computer-dist/src/assembly/dataset/struct.json     |   6 +-
 .../travis/install-hugegraph-from-source.sh        |   3 +
 computer-dist/src/assembly/travis/install-k8s.sh   |   1 +
 .../assembly/travis/load-data-into-hugegraph.sh    |  44 +++--
 computer-dist/src/assembly/travis/start-etcd.sh    |   3 +-
 .../computer/k8s/operator/OperatorEntrypoint.java  |  50 +++--
 computer-test/pom.xml                              |  32 +++
 .../network/netty/NettyTransportClientTest.java    | 110 ++++-------
 .../core/receiver/MessageRecvManagerTest.java      |  46 ++---
 .../computer/core/worker/WorkerServiceTest.java    | 214 +++++++++------------
 .../computer/k8s/KubernetesDriverTest.java         | 104 ++++------
 pom.xml                                            |  26 ++-
 23 files changed, 615 insertions(+), 576 deletions(-)

diff --git a/.github/configs/settings.xml b/.github/configs/settings.xml
new file mode 100644
index 00000000..294ded1c
--- /dev/null
+++ b/.github/configs/settings.xml
@@ -0,0 +1,60 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 
https://maven.apache.org/xsd/settings-1.0.0.xsd";>
+    <servers>
+        <server>
+            <id>github</id>
+            <username>${env.GITHUB_ACTOR}</username>
+            <password>${env.GITHUB_TOKEN}</password>
+        </server>
+    </servers>
+
+    <profiles>
+        <profile>
+            <id>local-repo</id>
+            <repositories>
+                <repository>
+                    <id>central</id>
+                    <url>https://repo.maven.apache.org/maven2</url>
+                    <releases>
+                        <enabled>true</enabled>
+                    </releases>
+                    <snapshots>
+                        <enabled>false</enabled>
+                    </snapshots>
+                </repository>
+                <repository>
+                    <id>staged-releases</id>
+                    
<url>https://repository.apache.org/content/groups/staging/</url>
+                </repository>
+            </repositories>
+            <pluginRepositories>
+                <pluginRepository>
+                    <id>staged-releases</id>
+                    
<url>https://repository.apache.org/content/groups/staging/</url>
+                </pluginRepository>
+            </pluginRepositories>
+        </profile>
+    </profiles>
+
+    <activeProfiles>
+        <activeProfile>local-repo</activeProfile>
+    </activeProfiles>
+</settings>
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e64ec7d7..5cfd38a1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -11,14 +11,16 @@ jobs:
   computer-ci:
     runs-on: ubuntu-latest
     env:
+      USE_STAGE: 'true' # Whether to include the stage repository.
       TRAVIS_DIR: computer-dist/src/assembly/travis
       KUBERNETES_VERSION: 1.20.1
-      HUGEGRAPH_SERVER_COMMIT_ID: d01c8737d7d5909119671953521f1401dcd1a188
       BSP_ETCD_URL: http://localhost:2579
+      # TODO: delete this env in the future (replaced by docker way now)
+      HUGEGRAPH_SERVER_COMMIT_ID: d01c8737d7d5909119671953521f1401dcd1a188
 
     steps:
       - name: Checkout
-        uses: actions/checkout@v3
+        uses: actions/checkout@v4
         with:
           fetch-depth: 2
 
@@ -57,12 +59,6 @@ jobs:
       - name: Setup Minikube-Kubernetes
         run: $TRAVIS_DIR/install-k8s.sh
 
-      - name: Check Component
-        run: |
-          sleep 5
-          curl localhost:9000
-          kubectl get nodes
-
       - name: Cache Maven packages
         uses: actions/cache@v3
         with:
@@ -70,11 +66,16 @@ jobs:
           key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
           restore-keys: ${{ runner.os }}-m2
 
+      - name: Check Component
+        run: |
+          curl localhost:9000
+          kubectl get nodes
+
       - name: Prepare env and service
         run: |
           $TRAVIS_DIR/install-env.sh
-          $TRAVIS_DIR/install-hugegraph-from-source.sh 
$HUGEGRAPH_SERVER_COMMIT_ID
           $TRAVIS_DIR/load-data-into-hugegraph.sh
+          #$TRAVIS_DIR/install-hugegraph-from-source.sh 
$HUGEGRAPH_SERVER_COMMIT_ID
 
       - name: Install JDK 11
         uses: actions/setup-java@v3
@@ -82,8 +83,14 @@ jobs:
           java-version: '11'
           distribution: 'zulu'
 
+      - name: Use staged maven repo
+        if: ${{ env.USE_STAGE == 'true' }}
+        run: |
+          cp $HOME/.m2/settings.xml /tmp/settings.xml
+          mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml
+
       - name: Compile
-        run: mvn clean compile -Dmaven.javadoc.skip=true -ntp
+        run: mvn clean compile -e -Dmaven.javadoc.skip=true -ntp
 
       - name: Integrate test
         run: mvn test -P integrate-test -ntp
@@ -92,6 +99,7 @@ jobs:
         run: mvn test -P unit-test -ntp
 
       - name: Upload coverage to Codecov
-        uses: codecov/[email protected]
+        uses: codecov/codecov-action@v3
         with:
+          token: ${{ secrets.CODECOV_TOKEN }}
           file: target/site/jacoco/jacoco.xml
diff --git a/.github/workflows/codeql-analysis.yml 
b/.github/workflows/codeql-analysis.yml
index 7bc30f62..32bebd0a 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -4,13 +4,15 @@ name: "CodeQL"
 
 on:
   pull_request:
-    # The branches below must be a subset of the branches above, now enable it 
in all PR
-    # branches: [ master ]
+  # The branches below must be a subset of the branches above, now enable it 
in all PR
+  # branches: [ master ]
   schedule:
     - cron: '45 7 * * 1'
 
 jobs:
   analyze:
+    env:
+      USE_STAGE: 'true' # Whether to include the stage repository.
     name: Analyze
     runs-on: ubuntu-latest
     permissions:
@@ -24,43 +26,55 @@ jobs:
         language: [ 'go', 'java' ]
 
     steps:
-    - name: Checkout repository
-      uses: actions/checkout@v3
+      - name: Checkout repository
+        uses: actions/checkout@v4
 
-    # Initializes the CodeQL tools for scanning.
-    - name: Initialize CodeQL
-      uses: github/codeql-action/init@v2
-      with:
-        languages: ${{ matrix.language }}
-        # If you wish to specify custom queries, you can do so here or in a 
config file.
-        # By default, queries listed here will override any specified in a 
config file.
-        # Prefix the list here with "+" to use these queries and those in the 
config file.
-        # queries: ./path/to/local/query, your-org/your-repo/queries@main
+      - name: Setup Java JDK
+        uses: actions/setup-java@v3
+        with:
+          distribution: 'zulu'
+          java-version: '11'
 
-    # Autobuild attempts to build any compiled languages  (C/C++, C#, or Java).
-    # If this step fails, then you should remove it and run the build manually 
(see below)
-    - name: Autobuild
-      uses: github/codeql-action/autobuild@v2
+      - name: use staged maven repo settings
+        if: ${{ env.USE_STAGE == 'true' }}
+        run: |
+          cp $HOME/.m2/settings.xml /tmp/settings.xml
+          mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml
 
-    # ℹ️ Command-line programs to run using the OS shell.
-    # 📚 https://git.io/JvXDl
+      # Initializes the CodeQL tools for scanning.
+      - name: Initialize CodeQL
+        uses: github/codeql-action/init@v2
+        with:
+          languages: ${{ matrix.language }}
+          # If you wish to specify custom queries, you can do so here or in a 
config file.
+          # By default, queries listed here will override any specified in a 
config file.
+          # Prefix the list here with "+" to use these queries and those in 
the config file.
+          # queries: ./path/to/local/query, your-org/your-repo/queries@main
 
-    # ✏️ If the Autobuild fails above, remove it and uncomment the following 
three lines
-    #    and modify them (or add more) to build your code if your project
-    #    uses a compiled language
+      # Autobuild attempts to build any compiled languages (C/C++, C#, or 
Java).
+      # If this step fails, then you should remove it and run the build 
manually (see below)
+      - name: Autobuild
+        uses: github/codeql-action/autobuild@v2
 
-    #- run: |
-    #   make bootstrap
-    #   make release
+      # ℹ️ Command-line programs to run using the OS shell.
+      # 📚 https://git.io/JvXDl
 
-    - name: Perform CodeQL Analysis
-      uses: github/codeql-action/analyze@v2
+      # ✏️ If the Autobuild fails above, remove it and uncomment the following 
three lines
+      #    and modify them (or add more) to build your code if your project
+      #    uses a compiled language
+
+      #- run: |
+      #   make bootstrap
+      #   make release
+
+      - name: Perform CodeQL Analysis
+        uses: github/codeql-action/analyze@v2
 
   dependency-review:
     runs-on: ubuntu-latest
     steps:
       - name: 'Checkout Repository'
-        uses: actions/checkout@v3
+        uses: actions/checkout@v4
       - name: 'Dependency Review'
         uses: actions/dependency-review-action@v3
         
diff --git a/.github/workflows/license-checker.yml 
b/.github/workflows/license-checker.yml
index ea2c3687..caa3f85f 100644
--- a/.github/workflows/license-checker.yml
+++ b/.github/workflows/license-checker.yml
@@ -15,7 +15,7 @@ jobs:
   check-license-header:
     runs-on: ubuntu-latest
     steps:
-      - uses: actions/checkout@v3
+      - uses: actions/checkout@v4
       # More info could refer to: https://github.com/apache/skywalking-eyes
       - name: Check License Header
         uses: apache/skywalking-eyes@main
@@ -36,25 +36,29 @@ jobs:
           find ./ -name rat.txt -print0 | xargs -0 -I file cat file > 
merged-rat.txt
           grep "Binaries" merged-rat.txt -C 3 && cat merged-rat.txt
 
-#  TODO: enable it later
-#  check-dependency-license:
-#    runs-on: ubuntu-latest
-#    env:
-#      SCRIPT_DEPENDENCY: computer-dist/scripts/dependency
-#    steps:
-#      - name: Checkout source
-#        uses: actions/checkout@v3
-#      - name: Set up JDK 11
-#        uses: actions/setup-java@v3
-#        with:
-#          java-version: '11'
-#          distribution: 'adopt'
-#      - name: mvn install
-#        run: |
-#          mvn install -DskipTests=true -ntp
-#      - name: generate current dependencies
-#        run: |
-#          bash $SCRIPT_DEPENDENCY/regenerate_known_dependencies.sh 
current-dependencies.txt
-#      - name: check third dependencies
-#        run: |
-#          bash $SCRIPT_DEPENDENCY/check_dependencies.sh
+  check-dependency-license:
+    runs-on: ubuntu-latest
+    env:
+      SCRIPT_DEPENDENCY: computer-dist/scripts/dependency
+      USE_STAGE: 'true' # Whether to include the stage repository.
+    steps:
+      - name: Checkout source
+        uses: actions/checkout@v4
+      - name: Set up JDK 11
+        uses: actions/setup-java@v3
+        with:
+          java-version: '11'
+          distribution: 'adopt'
+      - name: Use staged maven repo settings
+        if: ${{ env.USE_STAGE == 'true' }}
+        run: |
+          cp $HOME/.m2/settings.xml /tmp/settings.xml
+          mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml
+      - name: Compile install
+        run: mvn package -DskipTests=true -ntp
+
+      # TODO: enable it after the check scripts are ready, lack them now
+      #- name: Generate & check current 3rd-party dependencies
+      #  run: |
+      #    bash $SCRIPT_DEPENDENCY/regenerate_known_dependencies.sh 
current-dependencies.txt
+      #    bash $SCRIPT_DEPENDENCY/check_dependencies.sh
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 3ba4f72b..5b5cdc21 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -13,7 +13,7 @@ jobs:
       pull-requests: write
 
     steps:
-    - uses: actions/stale@v3
+    - uses: actions/stale@v8
       with:
         repo-token: ${{ secrets.GITHUB_TOKEN }}
         stale-issue-message: 'Due to the lack of activity, the current issue 
is marked as stale and will be closed after 20 days, any update will remove the 
stale label'
diff --git a/checkstyle.xml b/checkstyle.xml
index 96156a36..ef00578c 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -35,7 +35,8 @@
     <module name="TreeWalker">
         <!--检查行长度-->
         <module name="LineLength">
-            <property name="max" value="100"/>
+            <!-- Use 101 due to some legacy code reason -->
+            <property name="max" value="101"/>
             <!--可以忽略的行-->
             <property name="ignorePattern"
                       value="^package.*|^import.*|a 
href|href|http://|https://|ftp://"/>
@@ -84,7 +85,8 @@
         <module name="WhitespaceAround"/>
         <!--左圆括号之后和右圆括号之前是否需要有一个空格,不需要-->
         <module name="ParenPad"/>
-        
<!--检查修饰符是否符合Java建议,顺序是:public、protected、private、abstract、default、static、final、transient、volatile、synchronized、native、strictfp-->
+        
<!--检查修饰符是否符合Java建议,顺序是:public、protected、private、abstract、default、static、
+        final、transient、volatile、synchronized、native、strictfp-->
         <module name="ModifierOrder"/>
         <!--检查代码块的左花括号的放置位置,必须在当前行的末尾-->
         <module name="LeftCurly">
diff --git 
a/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
 
b/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
index 73658f0a..846ccaac 100644
--- 
a/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
+++ 
b/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
@@ -59,7 +59,7 @@ public final class Constants {
     public static final int FUTURE_TIMEOUT = 300;
 
     /*
-     * The timeout in millisecond for threadpool shutdown
+     * The timeout in millisecond for thread-pool shutdown
      */
     public static final long SHUTDOWN_TIMEOUT = 5000L;
 
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
index c4681be1..ce97cb48 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
@@ -65,8 +65,7 @@ public class EtcdClient {
                                "The endpoints can't be null");
         E.checkArgumentNotNull(namespace,
                                "The namespace can't be null");
-        ByteSequence namespaceSeq = ByteSequence.from(namespace.getBytes(
-                                                                ENCODING));
+        ByteSequence namespaceSeq = 
ByteSequence.from(namespace.getBytes(ENCODING));
         this.client = Client.builder().endpoints(endpoints)
                             .namespace(namespaceSeq).build();
         this.watch = this.client.getWatchClient();
@@ -80,20 +79,14 @@ public class EtcdClient {
      * @param value value to be associated with the specified key
      */
     public void put(String key, byte[] value) {
-        E.checkArgument(key != null,
-                        "The key can't be null.");
-        E.checkArgument(value != null,
-                        "The value can't be null.");
+        E.checkArgument(key != null, "The key can't be null.");
+        E.checkArgument(value != null, "The value can't be null.");
         try {
-            this.kv.put(ByteSequence.from(key, ENCODING),
-                        ByteSequence.from(value))
-                   .get();
+            this.kv.put(ByteSequence.from(key, ENCODING), 
ByteSequence.from(value)).get();
         } catch (InterruptedException e) {
-            throw new ComputerException(
-                      "Interrupted while putting with key='%s'", e, key);
+            throw new ComputerException("Interrupted while putting with 
key='%s'", e, key);
         } catch (ExecutionException e) {
-            throw new ComputerException("Error while putting with key='%s'",
-                                        e, key);
+            throw new ComputerException("Error while putting with key='%s'", 
e, key);
         }
     }
 
@@ -110,8 +103,8 @@ public class EtcdClient {
      * Returns the value to which the specified key is mapped.
      * @param key The key to be found
      * @param throwException whether to throw ComputerException if not found.
-     * @return the value of specified key, null if not found and
-     * throwException is set false
+     * @return the value of the specified key, null if not found,
+     * and throwException is set false
      * @throws ComputerException if not found and throwException is set true
      */
     public byte[] get(String key, boolean throwException) {
@@ -124,24 +117,20 @@ public class EtcdClient {
                 assert kvs.size() == 1;
                 return kvs.get(0).getValue().getBytes();
             } else if (throwException) {
-                 throw new ComputerException("Can't find value for key='%s'",
-                                             key);
+                 throw new ComputerException("Can't find value for key='%s'", 
key);
             } else {
                 return null;
             }
         } catch (InterruptedException e) {
-            throw new ComputerException(
-                      "Interrupted while getting with key='%s'", e, key);
+            throw new ComputerException("Interrupted while getting with 
key='%s'", e, key);
         } catch (ExecutionException e) {
-            throw new ComputerException("Error while getting with key='%s'",
-                                        e, key);
+            throw new ComputerException("Error while getting with key='%s'", 
e, key);
         }
     }
 
     /**
      * Returns the value to which the specified key is mapped. If no
-     * key exists, wait at most timeout milliseconds. Or throw
-     * ComputerException if timeout
+     * key exists, wait for most time out milliseconds. Or throw 
ComputerException if timeout
      * @param key the key whose associated value is to be returned.
      * @param timeout the max time in milliseconds to wait.
      * @return the specified value in byte array to which the specified key is
@@ -149,11 +138,9 @@ public class EtcdClient {
      */
     public byte[] get(String key, long timeout, long logInterval) {
         E.checkArgumentNotNull(key, "The key can't be null");
-        E.checkArgument(timeout > 0L,
-                        "The timeout must be > 0, but got: %s", timeout);
-        E.checkArgument(logInterval > 0L,
-                        "The logInterval must be > 0, but got: %s",
-                        logInterval);
+        E.checkArgument(timeout > 0L, "The timeout must be > 0, but got: %s", 
timeout);
+        E.checkArgument(logInterval > 0L, "The logInterval must be > 0, but 
got: %s", logInterval);
+
         ByteSequence keySeq = ByteSequence.from(key, ENCODING);
         try {
             GetResponse response = this.kv.get(keySeq).get();
@@ -162,16 +149,12 @@ public class EtcdClient {
                 return kvs.get(0).getValue().getBytes();
             } else {
                 long revision = response.getHeader().getRevision();
-                return this.waitAndGetFromPutEvent(keySeq, revision,
-                                                   timeout, logInterval);
+                return this.waitAndGetFromPutEvent(keySeq, revision, timeout, 
logInterval);
             }
         } catch (InterruptedException e) {
-            throw new ComputerException(
-                      "Interrupted while getting with key='%s'",
-                      e, key);
+            throw new ComputerException("Interrupted while getting with 
key='%s'", e, key);
         } catch (ExecutionException e) {
-            throw new ComputerException("Error while getting with key='%s'",
-                                        e, key);
+            throw new ComputerException("Error while getting with key='%s'", 
e, key);
         }
     }
 
@@ -214,8 +197,7 @@ public class EtcdClient {
                                              .withRevision(revision)
                                              .withNoDelete(true)
                                              .build();
-        try (Watch.Watcher watcher = this.watch.watch(keySeq, watchOption,
-                                                      consumer)) {
+        try (Watch.Watcher ignored = this.watch.watch(keySeq, watchOption, 
consumer)) {
             return barrierEvent.await(timeout, logInterval, () -> {
                 LOG.info("Wait for key '{}' with timeout {}ms",
                          keySeq.toString(ENCODING), timeout);
@@ -225,7 +207,7 @@ public class EtcdClient {
 
     /**
      * Get the values of keys with the specified prefix.
-     * If no key found, return empty list.
+     * If no key is found, return an empty list.
      */
     public List<byte[]> getWithPrefix(String prefix) {
         E.checkArgumentNotNull(prefix, "The prefix can't be null");
@@ -251,7 +233,7 @@ public class EtcdClient {
 
     /**
      * Get the expected count of values of keys with the specified prefix.
-     * Throws ComputerException if there are no enough object.
+     * Throws ComputerException if there are no enough objects.
      */
     public List<byte[]> getWithPrefix(String prefix, int count) {
         E.checkArgumentNotNull(prefix,
@@ -284,12 +266,12 @@ public class EtcdClient {
     }
 
     /**
-     * Get expected count of values with the key prefix with prefix. If there
-     * is no count of keys, wait at most timeout milliseconds.
+     * Get the expected count of values with the key prefix with prefix.
+     * If there is no count of keys, wait at max timeout milliseconds.
      * @param prefix the key prefix
-     * @param count the expected count of values to be get
+     * @param count the expected count of values to be got
      * @param timeout the max wait time
-     * @param logInterval the interval in ms to log message
+     * @param logInterval the interval in ms to log a message
      * @return the list of values which key with specified prefix
      */
     public List<byte[]> getWithPrefix(String prefix, int count,
@@ -329,8 +311,8 @@ public class EtcdClient {
 
     /**
      * Wait at most expected eventCount events triggered in timeout ms.
-     * This method wait at most timeout ms regardless whether expected
-     * eventCount events triggered.
+     * This method waits at most timeout ms regardless of whether 
expected-eventCount events
+     * triggered.
      * @param existedKeyValues readonly
      */
     private List<byte[]> waitAndPrefixGetFromPutEvent(
@@ -368,21 +350,18 @@ public class EtcdClient {
                                              .withPrefix(prefixSeq)
                                              .withRevision(revision)
                                              .build();
-        try (Watch.Watcher watcher = this.watch.watch(prefixSeq,
-                                                      watchOption,
-                                                      consumer)) {
+        try (Watch.Watcher ignored = this.watch.watch(prefixSeq, watchOption, 
consumer)) {
             return barrierEvent.await(timeout, logInterval, () -> {
                 LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " +
                          "expect {} keys but actual got {} keys",
-                         prefixSeq.toString(ENCODING),
-                         timeout, count, keyValues.size());
+                         prefixSeq.toString(ENCODING), timeout, count, 
keyValues.size());
             });
         }
     }
 
     /**
      * @return 1 if deleted specified key, 0 if not found specified key
-     * The deleted data can be get through revision, if revision is compacted,
+     * The deleted data can be got through revision, if revision is compacted,
      * throw exception "etcdserver: mvcc: required revision has been 
compacted".
      * @see <a href="https://etcd.io/docs/v3.4.0/op-guide/maintenance/";>
      *      Maintenance</a>
@@ -391,12 +370,10 @@ public class EtcdClient {
         E.checkArgumentNotNull(key, "The key can't be null");
         ByteSequence keySeq = ByteSequence.from(key, ENCODING);
         try {
-            DeleteResponse response = this.client.getKVClient().delete(keySeq)
-                                                 .get();
+            DeleteResponse response = 
this.client.getKVClient().delete(keySeq).get();
             return response.getDeleted();
         } catch (InterruptedException e) {
-            throw new ComputerException("Interrupted while deleting '%s'",
-                                        e, key);
+            throw new ComputerException("Interrupted while deleting '%s'", e, 
key);
         } catch (ExecutionException e) {
             throw new ComputerException("Error while deleting '%s'", e, key);
         }
@@ -408,12 +385,10 @@ public class EtcdClient {
     public long deleteWithPrefix(String prefix) {
         E.checkArgumentNotNull(prefix, "The prefix can't be null");
         ByteSequence prefixSeq = ByteSequence.from(prefix, ENCODING);
-        DeleteOption deleteOption = DeleteOption.newBuilder()
-                                                .withPrefix(prefixSeq).build();
+        DeleteOption deleteOption = 
DeleteOption.newBuilder().withPrefix(prefixSeq).build();
         try {
             DeleteResponse response = this.client.getKVClient()
-                                                 .delete(prefixSeq,
-                                                         deleteOption)
+                                                 .delete(prefixSeq, 
deleteOption)
                                                  .get();
             return response.getDeleted();
         } catch (InterruptedException e) {
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
index 06fb7564..da01fa7b 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
@@ -66,11 +66,10 @@ public class MasterService implements Closeable {
     private Config config;
     private volatile Bsp4Master bsp4Master;
     private ContainerInfo masterInfo;
-    private List<ContainerInfo> workers;
     private int maxSuperStep;
     private MasterComputation masterComputation;
 
-    private volatile ShutdownHook shutdownHook;
+    private final ShutdownHook shutdownHook;
     private volatile Thread serviceThread;
 
     public MasterService() {
@@ -101,7 +100,7 @@ public class MasterService implements Closeable {
                                             rpcAddress.getPort());
         /*
          * Connect to BSP server and clean the old data may be left by the
-         * previous job with same job id.
+         * previous job with the same job id.
          */
         this.bsp4Master = new Bsp4Master(this.config);
         this.bsp4Master.clean();
@@ -114,9 +113,9 @@ public class MasterService implements Closeable {
         LOG.info("{} register MasterService", this);
         this.bsp4Master.masterInitDone(this.masterInfo);
 
-        this.workers = this.bsp4Master.waitWorkersInitDone();
+        List<ContainerInfo> workers = this.bsp4Master.waitWorkersInitDone();
         LOG.info("{} waited all workers registered, workers count: {}",
-                 this, this.workers.size());
+                 this, workers.size());
 
         LOG.info("{} MasterService initialized", this);
         this.inited = true;
@@ -141,24 +140,36 @@ public class MasterService implements Closeable {
     }
 
     /**
-     * Stop the the master service. Stop the managers created in
-     * {@link #init(Config)}.
+     * Stop the master service. Stop the managers created in {@link 
#init(Config)}.
      */
     @Override
     public synchronized void close() {
-        this.checkInited();
+        // TODO: check the logic of close carefully later
+        //this.checkInited();
         if (this.closed) {
             LOG.info("{} MasterService had closed before", this);
             return;
         }
 
-        this.masterComputation.close(new DefaultMasterContext());
+        try {
+            if (this.masterComputation != null) {
+                this.masterComputation.close(new DefaultMasterContext());
+            }
+        } catch (Exception e) {
+            LOG.error("Error occurred while closing master service", e);
+        }
 
-        if (!failed) {
+        if (!failed && this.bsp4Master != null) {
             this.bsp4Master.waitWorkersCloseDone();
         }
 
-        this.managers.closeAll(this.config);
+        try {
+            if (managers != null) {
+                this.managers.closeAll(this.config);
+            }
+        } catch (Exception e) {
+            LOG.error("Error occurred while closing managers", e);
+        }
 
         this.cleanAndCloseBsp();
         this.shutdownHook.unhook();
@@ -333,7 +344,7 @@ public class MasterService implements Closeable {
      * 1): Has run maxSuperStep times of superstep iteration.
      * 2): The mater-computation returns false that stop superstep iteration.
      * 3): All vertices are inactive and no message sent in a superstep.
-     * @param masterContinue The master-computation decide
+     * @param masterContinue The master-computation decides
      * @return true if finish superstep iteration.
      */
     private boolean finishedIteration(boolean masterContinue,
@@ -351,7 +362,7 @@ public class MasterService implements Closeable {
 
     /**
      * Coordinate with workers to load vertices and edges from HugeGraph. There
-     * are two phases in inputstep. First phase is get input splits from
+     * are two phases in inputstep. The First phase is to get input splits from
      * master, and read the vertices and edges from input splits. The second
      * phase is after all workers read input splits, the workers merge the
      * vertices and edges to get the stats for each partition.
@@ -371,8 +382,7 @@ public class MasterService implements Closeable {
     }
 
     /**
-     * Wait the workers write result back. After this, the job is finished
-     * successfully.
+     * Wait the workers write a result back. After this, the job is finished 
successfully.
      */
     private void outputstep() {
         LOG.info("{} MasterService outputstep started", this);
@@ -400,7 +410,7 @@ public class MasterService implements Closeable {
                             "The aggregator class can't be null");
             Aggregator<V> aggr;
             try {
-                aggr = aggregatorClass.newInstance();
+                aggr = aggregatorClass.getDeclaredConstructor().newInstance();
             } catch (Exception e) {
                 throw new ComputerException("Can't new instance from class: 
%s",
                                             e, aggregatorClass.getName());
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
index 6f904d88..44732ad4 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
@@ -133,11 +133,9 @@ public class ClientSession extends TransportSession {
         } catch (Throwable e) {
             this.stateEstablished();
             if (e instanceof TimeoutException) {
-                throw new TransportException(
-                          "Timeout(%sms) to wait finish-response", timeout);
+                throw new TransportException("Timeout(%sms) to wait 
finish-response", timeout);
             } else {
-                throw new TransportException("Failed to wait finish-response",
-                                             e);
+                throw new TransportException("Failed to wait finish-response", 
e);
             }
         } finally {
             this.finishedFutureRef.compareAndSet(finishFuture, null);
@@ -150,13 +148,11 @@ public class ClientSession extends TransportSession {
                         "at finishAsync()", this.state);
 
         CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
-        boolean success = this.finishedFutureRef.compareAndSet(null,
-                                                               finishedFuture);
+        boolean success = this.finishedFutureRef.compareAndSet(null, 
finishedFuture);
         E.checkArgument(success, "The finishedFutureRef value must be null " +
                                  "at finishAsync()");
 
         int finishId = this.genFinishId();
-
         this.stateFinishSent(finishId);
         try {
             FinishMessage finishMessage = new FinishMessage(finishId);
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
index fc5b5dc5..8d776b32 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
@@ -61,22 +61,20 @@ public class WorkerService implements Closeable {
 
     private static final Logger LOG = Log.logger(WorkerService.class);
 
+    private volatile boolean inited;
+    private volatile boolean closed;
+
     private final ComputerContext context;
-    private final Managers managers;
     private final Map<Integer, ContainerInfo> workers;
+    private final Managers managers;
+    private final ShutdownHook shutdownHook;
 
-    private volatile boolean inited;
-    private volatile boolean closed;
-    private Config config;
     private Bsp4Worker bsp4Worker;
+    private Config config;
     private ComputeManager computeManager;
     private ContainerInfo workerInfo;
-
     private Combiner<Value> combiner;
 
-    private ContainerInfo masterInfo;
-
-    private volatile ShutdownHook shutdownHook;
     private volatile Thread serviceThread;
 
     public WorkerService() {
@@ -91,57 +89,67 @@ public class WorkerService implements Closeable {
     /**
      * Init worker service, create the managers used by worker service.
      */
-    public void init(Config config) {
-        E.checkArgument(!this.inited, "The %s has been initialized", this);
-
-        this.serviceThread = Thread.currentThread();
-        this.registerShutdownHook();
-
-        this.config = config;
-
-        this.workerInfo = new ContainerInfo();
-        LOG.info("{} Start to initialize worker", this);
-
-        this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
+    public synchronized void init(Config config) {
+        try {
+            LOG.info("{} Prepare to init WorkerService", this);
+            // TODO: what will happen if init() called by multiple threads?
+            E.checkArgument(!this.inited, "The %s has been initialized", this);
 
-        /*
-         * Keep the waitMasterInitDone() called before initManagers(),
-         * in order to ensure master init() before worker managers init()
-         */
-        this.masterInfo = this.bsp4Worker.waitMasterInitDone();
+            this.serviceThread = Thread.currentThread();
+            this.registerShutdownHook();
+            this.config = config;
+            this.workerInfo = new ContainerInfo();
 
-        InetSocketAddress address = this.initManagers(this.masterInfo);
-        this.workerInfo.updateAddress(address);
+            LOG.info("{} Start to initialize worker", this);
+            this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
+            /*
+             * Keep the waitMasterInitDone() called before initManagers(),
+             * in order to ensure master init() before worker managers init()
+             */
+            ContainerInfo masterInfo = this.bsp4Worker.waitMasterInitDone();
+            InetSocketAddress address = this.initManagers(masterInfo);
+            this.workerInfo.updateAddress(address);
+            this.loadComputation();
+
+            LOG.info("{} register WorkerService", this);
+            this.bsp4Worker.workerInitDone();
+            this.connectToWorkers();
+
+            this.computeManager = new ComputeManager(this.workerInfo.id(), 
this.context,
+                                                     this.managers);
+
+            this.managers.initedAll(this.config);
+            LOG.info("{} WorkerService initialized", this);
+            this.inited = true;
+        } catch (Exception e) {
+            LOG.error("Error while initializing WorkerService", e);
+            // TODO: shall we call close() here?
+            throw e;
+        }
+    }
 
+    private void loadComputation() {
         Computation<?> computation = this.config.createObject(
-                                     ComputerOptions.WORKER_COMPUTATION_CLASS);
+                ComputerOptions.WORKER_COMPUTATION_CLASS);
         LOG.info("Loading computation '{}' in category '{}'",
                  computation.name(), computation.category());
 
-        this.combiner = this.config.createObject(
-                        ComputerOptions.WORKER_COMBINER_CLASS, false);
+        this.combiner = 
this.config.createObject(ComputerOptions.WORKER_COMBINER_CLASS, false);
         if (this.combiner == null) {
-            LOG.info("None combiner is provided for computation '{}'",
-                     computation.name());
+            LOG.info("None combiner is provided for computation '{}'", 
computation.name());
         } else {
             LOG.info("Combiner '{}' is provided for computation '{}'",
                      this.combiner.name(), computation.name());
         }
+    }
 
-        LOG.info("{} register WorkerService", this);
-        this.bsp4Worker.workerInitDone();
+    private void connectToWorkers() {
         List<ContainerInfo> workers = this.bsp4Worker.waitMasterAllInitDone();
         DataClientManager dm = this.managers.get(DataClientManager.NAME);
         for (ContainerInfo worker : workers) {
             this.workers.put(worker.id(), worker);
             dm.connect(worker.id(), worker.hostname(), worker.dataPort());
         }
-
-        this.computeManager = new ComputeManager(this.workerInfo.id(), 
this.context, this.managers);
-
-        this.managers.initedAll(this.config);
-        LOG.info("{} WorkerService initialized", this);
-        this.inited = true;
     }
 
     private void registerShutdownHook() {
@@ -152,29 +160,50 @@ public class WorkerService implements Closeable {
     }
 
     /**
-     * Stop the worker service. Stop the managers created in
-     * {@link #init(Config)}.
+     * Stop the worker service. Stop the managers created in {@link 
#init(Config)}.
      */
     @Override
     public synchronized void close() {
-        this.checkInited();
+        // TODO: why checkInited() here, if init throws exception, how to 
close the resource?
+        //this.checkInited();
         if (this.closed) {
             LOG.info("{} WorkerService had closed before", this);
             return;
         }
 
-        this.computeManager.close();
+        try {
+            if (this.computeManager != null) {
+                this.computeManager.close();
+            } else {
+                LOG.warn("The computeManager is null");
+                return;
+            }
+        } catch (Exception e) {
+            LOG.error("Error when closing ComputeManager", e);
+        }
         /*
          * Seems managers.closeAll() would do the following actions:
          * TODO: close the connection to other workers.
          * TODO: stop the connection to the master
          * TODO: stop the data transportation server.
          */
-        this.managers.closeAll(this.config);
+        try {
+            this.managers.closeAll(this.config);
+        } catch (Exception e) {
+            LOG.error("Error while closing managers", e);
+        }
 
-        this.bsp4Worker.workerCloseDone();
-        this.bsp4Worker.close();
-        this.shutdownHook.unhook();
+        try {
+            this.bsp4Worker.workerCloseDone();
+            this.bsp4Worker.close();
+        } catch (Exception e) {
+            LOG.error("Error while closing bsp4Worker", e);
+        }
+        try {
+            this.shutdownHook.unhook();
+        } catch (Exception e) {
+            LOG.error("Error while unhooking shutdownHook", e);
+        }
 
         this.closed = true;
         LOG.info("{} WorkerService closed", this);
@@ -201,14 +230,13 @@ public class WorkerService implements Closeable {
     }
 
     /**
-     * Execute the superstep in worker. It first wait master witch superstep
+     * Execute the superstep in worker. It first waits master witch superstep
      * to start from. And then do the superstep iteration until master's
      * superstepStat is inactive.
      */
     public void execute() {
-        this.checkInited();
-
         LOG.info("{} WorkerService execute", this);
+        this.checkInited();
 
         // TODO: determine superstep if fail over is enabled.
         int superstep = this.bsp4Worker.waitMasterResumeDone();
@@ -227,8 +255,7 @@ public class WorkerService implements Closeable {
          * superstep.
          */
         while (superstepStat.active()) {
-            WorkerContext context = new SuperstepContext(superstep,
-                                                         superstepStat);
+            WorkerContext context = new SuperstepContext(superstep, 
superstepStat);
             LOG.info("Start computation of superstep {}", superstep);
             if (superstep > 0) {
                 this.computeManager.takeRecvedMessages();
@@ -242,13 +269,12 @@ public class WorkerService implements Closeable {
             this.managers.beforeSuperstep(this.config, superstep);
 
             /*
-             * Notify master by each worker, when the master received all
+             * Notify the master by each worker, when the master received all
              * workers signal, then notify all workers to do compute().
              */
             this.bsp4Worker.workerStepPrepareDone(superstep);
             this.bsp4Worker.waitMasterStepPrepareDone(superstep);
-            WorkerStat workerStat = this.computeManager.compute(context,
-                                                                superstep);
+            WorkerStat workerStat = this.computeManager.compute(context, 
superstep);
 
             this.bsp4Worker.workerStepComputeDone(superstep);
             this.bsp4Worker.waitMasterStepComputeDone(superstep);
@@ -274,8 +300,7 @@ public class WorkerService implements Closeable {
 
     @Override
     public String toString() {
-        Object id = this.workerInfo == null ?
-                    "?" + this.hashCode() : this.workerInfo.id();
+        Object id = this.workerInfo == null ? "?" + this.hashCode() : 
this.workerInfo.id();
         return String.format("[worker %s]", id);
     }
 
@@ -287,13 +312,11 @@ public class WorkerService implements Closeable {
          * NOTE: this init() method will be called twice, will be ignored at
          * the 2nd time call.
          */
-        WorkerRpcManager.updateRpcRemoteServerConfig(this.config,
-                                                     masterInfo.hostname(),
+        WorkerRpcManager.updateRpcRemoteServerConfig(this.config, 
masterInfo.hostname(),
                                                      masterInfo.rpcPort());
         rpcManager.init(this.config);
 
-        WorkerAggrManager aggregatorManager = new WorkerAggrManager(
-                                              this.context);
+        WorkerAggrManager aggregatorManager = new 
WorkerAggrManager(this.context);
         aggregatorManager.service(rpcManager.aggregateRpcService());
         this.managers.add(aggregatorManager);
         FileManager fileManager = new FileManager();
@@ -307,30 +330,22 @@ public class WorkerService implements Closeable {
         this.managers.add(recvManager);
 
         ConnectionManager connManager = new TransportConnectionManager();
-        DataServerManager serverManager = new DataServerManager(connManager,
-                                                                recvManager);
+        DataServerManager serverManager = new DataServerManager(connManager, 
recvManager);
         this.managers.add(serverManager);
 
-        DataClientManager clientManager = new DataClientManager(connManager,
-                                                                this.context);
+        DataClientManager clientManager = new DataClientManager(connManager, 
this.context);
         this.managers.add(clientManager);
 
         SortManager sendSortManager = new SendSortManager(this.context);
         this.managers.add(sendSortManager);
 
-        MessageSendManager sendManager = new MessageSendManager(this.context,
-                                         sendSortManager,
-                                         clientManager.sender());
+        MessageSendManager sendManager = new MessageSendManager(this.context, 
sendSortManager,
+                                                                
clientManager.sender());
         this.managers.add(sendManager);
-
-        SnapshotManager snapshotManager = new SnapshotManager(this.context,
-                                                              sendManager,
-                                                              recvManager,
-                                                              this.workerInfo);
+        SnapshotManager snapshotManager = new SnapshotManager(this.context, 
sendManager,
+                                                              recvManager, 
this.workerInfo);
         this.managers.add(snapshotManager);
-
-        WorkerInputManager inputManager = new WorkerInputManager(this.context,
-                                                                 sendManager,
+        WorkerInputManager inputManager = new WorkerInputManager(this.context, 
sendManager,
                                                                  
snapshotManager);
         inputManager.service(rpcManager.inputSplitService());
         this.managers.add(inputManager);
@@ -339,8 +354,8 @@ public class WorkerService implements Closeable {
         this.managers.initAll(this.config);
 
         InetSocketAddress address = serverManager.address();
-        LOG.info("{} WorkerService initialized managers with data server " +
-                 "address '{}'", this, address);
+        LOG.info("{} WorkerService initialized managers with data server 
address '{}'",
+                 this, address);
         return address;
     }
 
@@ -350,7 +365,7 @@ public class WorkerService implements Closeable {
 
     /**
      * Load vertices and edges from HugeGraph. There are two phases in
-     * inputstep. First phase is get input splits from master, and read the
+     * inputstep. The First phase is to get input splits from master, and read 
the
      * vertices and edges from input splits. The second phase is after all
      * workers read input splits, the workers merge the vertices and edges to
      * get the stats for each partition.
@@ -365,10 +380,8 @@ public class WorkerService implements Closeable {
 
         WorkerStat workerStat = this.computeManager.input();
 
-        this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP,
-                                       workerStat);
-        SuperstepStat superstepStat = this.bsp4Worker.waitMasterStepDone(
-                                      Constants.INPUT_SUPERSTEP);
+        this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP, workerStat);
+        SuperstepStat superstepStat = 
this.bsp4Worker.waitMasterStepDone(Constants.INPUT_SUPERSTEP);
         manager.close(this.config);
         LOG.info("{} WorkerService inputstep finished", this);
         return superstepStat;
@@ -395,10 +408,8 @@ public class WorkerService implements Closeable {
         private SuperstepContext(int superstep, SuperstepStat superstepStat) {
             this.superstep = superstep;
             this.superstepStat = superstepStat;
-            this.aggrManager = WorkerService.this.managers.get(
-                               WorkerAggrManager.NAME);
-            this.sendManager = WorkerService.this.managers.get(
-                               MessageSendManager.NAME);
+            this.aggrManager = 
WorkerService.this.managers.get(WorkerAggrManager.NAME);
+            this.sendManager = 
WorkerService.this.managers.get(MessageSendManager.NAME);
         }
 
         @Override
diff --git a/computer-dist/src/assembly/dataset/struct.json 
b/computer-dist/src/assembly/dataset/struct.json
index ed571ca4..c1d8be1e 100644
--- a/computer-dist/src/assembly/dataset/struct.json
+++ b/computer-dist/src/assembly/dataset/struct.json
@@ -6,7 +6,7 @@
       "skip": false,
       "input": {
         "type": "FILE",
-        "path": 
"computer-dist/src/assembly/dataset/ml-latest-small/ratings.csv",
+        "path": "/dataset/ml-latest-small/ratings.csv",
         "file_filter": {
           "extensions": [
             "*"
@@ -83,7 +83,7 @@
       "skip": false,
       "input": {
         "type": "FILE",
-        "path": "computer-dist/src/assembly/dataset/ml-latest-small/tags.csv",
+        "path": "/dataset/ml-latest-small/tags.csv",
         "file_filter": {
           "extensions": [
             "*"
@@ -160,7 +160,7 @@
       "skip": false,
       "input": {
         "type": "FILE",
-        "path": 
"computer-dist/src/assembly/dataset/ml-latest-small/movies.csv",
+        "path": "/dataset/ml-latest-small/movies.csv",
         "file_filter": {
           "extensions": [
             "*"
diff --git a/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh 
b/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
index af5281de..b5365caa 100755
--- a/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
+++ b/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
@@ -15,6 +15,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+# Note: this script is not used in github-ci now, keep it for other env
 set -ev
 
 if [[ $# -ne 1 ]]; then
@@ -40,4 +42,5 @@ chmod -R 755 bin/
 
 bin/init-store.sh || exit 1
 bin/start-hugegraph.sh || cat logs/hugegraph-server.log
+
 cd ../
diff --git a/computer-dist/src/assembly/travis/install-k8s.sh 
b/computer-dist/src/assembly/travis/install-k8s.sh
index 4619fa91..d18f4928 100755
--- a/computer-dist/src/assembly/travis/install-k8s.sh
+++ b/computer-dist/src/assembly/travis/install-k8s.sh
@@ -17,6 +17,7 @@
 #
 set -ev
 
+# TODO: could replace by docker way
 curl -Lo minikube 
https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && 
chmod +x minikube
 sudo mkdir -p /usr/local/bin/
 sudo install minikube /usr/local/bin/
diff --git a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh 
b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
index e00890b6..b448b661 100755
--- a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
+++ b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
@@ -21,27 +21,43 @@ set -ev
 TRAVIS_DIR=$(dirname "$0")
 DATASET_DIR=${TRAVIS_DIR}/../dataset
 
-HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git";
-
-git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
-
-cd hugegraph-toolchain
-mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
-
-cd hugegraph-loader
-tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
-cd ../../
+docker network create ci
+# Note: we need wait for server start finished, so start it first
+docker run -itd --name=graph --network ci -p 8080:8080 
hugegraph/hugegraph:latest && sleep 6
 
 wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
 unzip -d ${DATASET_DIR} ml-latest-small.zip
 
-hugegraph-toolchain/hugegraph-loader/apache-hugegraph-loader-*/bin/hugegraph-loader.sh
 \
--g hugegraph -f ${DATASET_DIR}/struct.json -s ${DATASET_DIR}/schema.groovy || 
exit 1
+cd ${DATASET_DIR}/.. && pwd && ls -lh *
+
+docker run -id --name=loader --network ci hugegraph/loader:latest
+docker cp dataset loader:/dataset || exit 1
+
+docker exec -i loader ls -lh /dataset
+docker exec -i loader bin/hugegraph-loader.sh -g hugegraph -p 8080 -h graph \
+    -f /dataset/struct.json -s /dataset/schema.groovy || exit 1
 
 # load dataset to hdfs
-sort -t , -k1n -u "${DATASET_DIR}"/ml-latest-small/ratings.csv | cut -d "," -f 
1 > "${DATASET_DIR}"/ml-latest-small/user_id.csv || exit 1
+sort -t , -k1n -u dataset/ml-latest-small/ratings.csv | cut -d "," -f 1 
>dataset/ml-latest-small/user_id.csv || exit 1
 /opt/hadoop/bin/hadoop fs -mkdir -p /dataset/ml-latest-small || exit 1
-/opt/hadoop/bin/hadoop fs -put "${DATASET_DIR}"/ml-latest-small/* 
/dataset/ml-latest-small || exit 1
+/opt/hadoop/bin/hadoop fs -put dataset/ml-latest-small/* 
/dataset/ml-latest-small || exit 1
 /opt/hadoop/bin/hadoop fs -ls /dataset/ml-latest-small
 
 echo "Load finished, continue to next step"
+
+############# Note: this part is not used in github-ci now, backup it for 
other env ##############
+#HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git";
+#git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
+#
+#cd hugegraph-toolchain
+#mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests 
-ntp
+#
+#cd hugegraph-loader
+#tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
+#cd ../../
+
+#wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
+#unzip -d ${DATASET_DIR} ml-latest-small.zip
+
+#hugegraph-toolchain/hugegraph-loader/apache-hugegraph-loader-*/bin/hugegraph-loader.sh
 \
+#    -g hugegraph -f ${DATASET_DIR}/struct.json -s 
${DATASET_DIR}/schema.groovy || exit 1
diff --git a/computer-dist/src/assembly/travis/start-etcd.sh 
b/computer-dist/src/assembly/travis/start-etcd.sh
index 896d0832..fe3893ac 100644
--- a/computer-dist/src/assembly/travis/start-etcd.sh
+++ b/computer-dist/src/assembly/travis/start-etcd.sh
@@ -17,8 +17,9 @@
 #
 set -ev
 
-TRAVIS_DIR=`dirname $0`
+TRAVIS_DIR=$(dirname $0)
 echo "Starting etcd..."
+# TODO: replace with docker way
 wget -O ${TRAVIS_DIR}/etcd.tar.gz 
https://github.com/etcd-io/etcd/releases/download/v3.5.0/etcd-v3.5.0-linux-amd64.tar.gz
 mkdir ${TRAVIS_DIR}/etcd
 tar -zxvf ${TRAVIS_DIR}/etcd.tar.gz -C ${TRAVIS_DIR}/etcd --strip-components 1
diff --git 
a/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
 
b/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
index 9934f513..c440551a 100644
--- 
a/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
+++ 
b/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.computer.k8s.operator;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -36,7 +37,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.configuration2.MapConfiguration;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpStatus;
 import org.apache.hugegraph.computer.k8s.Constants;
 import org.apache.hugegraph.computer.k8s.operator.common.AbstractController;
 import org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions;
@@ -61,6 +61,11 @@ import 
io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
 import io.fabric8.kubernetes.client.utils.Utils;
 
+/**
+ * The OperatorEntrypoint class is the main entry point for the Kubernetes 
operator.
+ * It sets up the Kubernetes client, registers controllers, and starts the 
HTTP server for
+ * health checks.
+ */
 public class OperatorEntrypoint {
 
     private static final Logger LOG = Log.logger(OperatorEntrypoint.class);
@@ -74,15 +79,13 @@ public class OperatorEntrypoint {
 
     public static void main(String[] args) {
         OperatorEntrypoint operatorEntrypoint = new OperatorEntrypoint();
-        Runtime.getRuntime().addShutdownHook(
-                new Thread(operatorEntrypoint::shutdown));
+        Runtime.getRuntime().addShutdownHook(new 
Thread(operatorEntrypoint::shutdown));
         operatorEntrypoint.start();
     }
 
     static {
-        OptionSpace.register(
-              "computer-k8s-operator",
-              
"org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions"
+        OptionSpace.register("computer-k8s-operator",
+                             
"org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions"
         );
     }
 
@@ -98,8 +101,7 @@ public class OperatorEntrypoint {
     public void start() {
         try {
             this.kubeClient = new DefaultKubernetesClient();
-            String watchNamespace = this.config.get(
-                                    OperatorOptions.WATCH_NAMESPACE);
+            String watchNamespace = 
this.config.get(OperatorOptions.WATCH_NAMESPACE);
             if (!Objects.equals(watchNamespace, Constants.ALL_NAMESPACE)) {
                 this.createNamespace(watchNamespace);
                 this.kubeClient = this.kubeClient.inNamespace(watchNamespace);
@@ -108,19 +110,17 @@ public class OperatorEntrypoint {
             LOG.info("Watch namespace: " + watchNamespace);
 
             this.addHealthCheck();
-
             this.registerControllers();
 
             this.informerFactory.startAllRegisteredInformers();
             this.informerFactory.addSharedInformerEventListener(exception -> {
-                LOG.error("Informer event listener exception occurred",
-                          exception);
+                LOG.error("Informer event listener exception occurred", 
exception);
                 OperatorEntrypoint.this.shutdown();
             });
 
-            // Start all controller
-            this.controllerPool = ExecutorUtil.newFixedThreadPool(
-                                  this.controllers.size(), "controllers-%d");
+            // Start all controllers
+            this.controllerPool = 
ExecutorUtil.newFixedThreadPool(this.controllers.size(),
+                                                                  
"controllers-%d");
             CountDownLatch latch = new CountDownLatch(this.controllers.size());
             List<CompletableFuture<Void>> futures = new ArrayList<>();
             for (AbstractController<?> controller : this.controllers) {
@@ -141,8 +141,7 @@ public class OperatorEntrypoint {
                 }
             });
 
-            CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}))
-                             .get();
+            CompletableFuture.anyOf(futures.toArray(new 
CompletableFuture[]{})).get();
         } catch (Throwable throwable) {
             LOG.error("Failed to start Operator: ", throwable);
         } finally {
@@ -201,16 +200,14 @@ public class OperatorEntrypoint {
     }
 
     private void registerControllers() {
-        ComputerJobController jobController = new ComputerJobController(
-                                              this.config, this.kubeClient);
-        this.registerController(jobController,
-                                ConfigMap.class, Job.class, Pod.class);
+        ComputerJobController jobController = new 
ComputerJobController(this.config,
+                                                                        
this.kubeClient);
+        this.registerController(jobController, ConfigMap.class, Job.class, 
Pod.class);
     }
 
     @SafeVarargs
-    private final void registerController(
-                       AbstractController<?> controller,
-                       Class<? extends HasMetadata>... ownsClass) {
+    private void registerController(AbstractController<?> controller,
+                                    Class<? extends HasMetadata>... ownsClass) 
{
         controller.register(this.informerFactory, ownsClass);
         this.controllers.add(controller);
     }
@@ -222,7 +219,7 @@ public class OperatorEntrypoint {
         this.httpServer = HttpServer.create(address, probeBacklog);
         this.httpServer.createContext("/health", httpExchange -> {
             byte[] bytes = "ALL GOOD!".getBytes(StandardCharsets.UTF_8);
-            httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
+            httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 
bytes.length);
             OutputStream responseBody = httpExchange.getResponseBody();
             responseBody.write(bytes);
             responseBody.close();
@@ -233,7 +230,7 @@ public class OperatorEntrypoint {
     private void addReadyCheck() {
         this.httpServer.createContext("/ready", httpExchange -> {
             byte[] bytes = "ALL Ready!".getBytes(StandardCharsets.UTF_8);
-            httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
+            httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 
bytes.length);
             OutputStream responseBody = httpExchange.getResponseBody();
             responseBody.write(bytes);
             responseBody.close();
@@ -245,8 +242,7 @@ public class OperatorEntrypoint {
                                                          .withName(namespace)
                                                          .endMetadata();
         KubeUtil.ignoreExists(() -> {
-            return this.kubeClient.namespaces()
-                                  .create(builder.build());
+            return this.kubeClient.namespaces().create(builder.build());
         });
     }
 }
diff --git a/computer-test/pom.xml b/computer-test/pom.xml
index 4b55da32..16f17a7f 100644
--- a/computer-test/pom.xml
+++ b/computer-test/pom.xml
@@ -38,6 +38,12 @@
         <dependency>
             <groupId>org.apache.hugegraph</groupId>
             <artifactId>hugegraph-common</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kotlin-stdlib-jdk8</artifactId>
+                    <groupId>org.jetbrains.kotlin</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hugegraph</groupId>
@@ -96,11 +102,37 @@
             <version>3.8.0</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>4.12.0</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>junit</artifactId>
+                    <groupId>junit</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>kotlin-stdlib</artifactId>
+                    <groupId>org.jetbrains.kotlin</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>io.fabric8</groupId>
             <artifactId>kubernetes-server-mock</artifactId>
             <version>5.6.0</version>
             <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>mockwebserver</artifactId>
+                    <groupId>com.squareup.okhttp3</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jackson-databind</artifactId>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
diff --git 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
index acf48bab..6d132a83 100644
--- 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
+++ 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
@@ -53,18 +53,12 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
 
     @Override
     protected void initOption() {
-        super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS,
-                           8);
-        super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS,
-                           6);
-        super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK,
-                           64 * (int) Bytes.MB);
-        super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK,
-                           32 * (int) Bytes.MB);
-        super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL,
-                           200L);
-        super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT,
-                           30_000L);
+        super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS, 8);
+        super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS, 6);
+        super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK, 
64 * (int) Bytes.MB);
+        super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK, 32 
* (int) Bytes.MB);
+        super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL, 200L);
+        super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT, 
30_000L);
     }
 
     @Test
@@ -124,12 +118,9 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
         for (int i = 0; i < 3; i++) {
             client.startSession();
-            client.send(MessageType.MSG, 1,
-                        ByteBuffer.wrap(StringEncoding.encode("test1")));
-            client.send(MessageType.VERTEX, 2,
-                        ByteBuffer.wrap(StringEncoding.encode("test2")));
-            client.send(MessageType.EDGE, 3,
-                        ByteBuffer.wrap(StringEncoding.encode("test3")));
+            client.send(MessageType.MSG, 1, 
ByteBuffer.wrap(StringEncoding.encode("test1")));
+            client.send(MessageType.VERTEX, 2, 
ByteBuffer.wrap(StringEncoding.encode("test2")));
+            client.send(MessageType.EDGE, 3, 
ByteBuffer.wrap(StringEncoding.encode("test3")));
             client.finishSession();
         }
     }
@@ -172,8 +163,7 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
             Assert.assertNotSame(sourceBytes, bytes);
 
             return null;
-        }).when(serverHandler).handle(Mockito.any(), Mockito.eq(1),
-                                      Mockito.any());
+        }).when(serverHandler).handle(Mockito.any(), Mockito.eq(1), 
Mockito.any());
 
         client.startSession();
         client.send(MessageType.MSG, 1, ByteBuffer.wrap(sourceBytes1));
@@ -187,35 +177,25 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
 
         Function<Message, ChannelFuture> sendFunc = message -> null;
-        Whitebox.setInternalState(client.clientSession(),
-                                  "sendFunction", sendFunc);
+        Whitebox.setInternalState(client.clientSession(), "sendFunction", 
sendFunc);
 
-        Assert.assertThrows(TransportException.class, () -> {
-            client.startSession();
-        }, e -> {
-            Assert.assertContains("to wait start-response",
-                                  e.getMessage());
+        Assert.assertThrows(TransportException.class, client::startSession, e 
-> {
+            Assert.assertContains("to wait start-response", e.getMessage());
         });
     }
 
     @Test
     public void testFinishSessionWithTimeout() throws IOException {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
-
         client.startSession();
 
         Function<Message, ChannelFuture> sendFunc = message -> null;
-        Whitebox.setInternalState(client.clientSession(),
-                                  "sendFunction", sendFunc);
+        Whitebox.setInternalState(client.clientSession(), "sendFunction", 
sendFunc);
 
-        Whitebox.setInternalState(client, "timeoutFinishSession",
-                                  1000L);
+        Whitebox.setInternalState(client, "timeoutFinishSession", 1000L);
 
-        Assert.assertThrows(TransportException.class, () -> {
-            client.finishSession();
-        }, e -> {
-            Assert.assertContains("to wait finish-response",
-                                  e.getMessage());
+        Assert.assertThrows(TransportException.class, client::finishSession, e 
-> {
+            Assert.assertContains("to wait finish-response", e.getMessage());
         });
     }
 
@@ -224,18 +204,14 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
 
         @SuppressWarnings("unchecked")
-        Function<Message, ChannelFuture> sendFunc =
-                                         Mockito.mock(Function.class);
-        Whitebox.setInternalState(client.clientSession(),
-                                  "sendFunction", sendFunc);
+        Function<Message, ChannelFuture> sendFunc = 
Mockito.mock(Function.class);
+        Whitebox.setInternalState(client.clientSession(), "sendFunction", 
sendFunc);
 
         Mockito.doThrow(new RuntimeException("test exception"))
                .when(sendFunc)
                .apply(Mockito.any());
 
-        Assert.assertThrows(RuntimeException.class, () -> {
-            client.startSession();
-        }, e -> {
+        Assert.assertThrows(RuntimeException.class, client::startSession, e -> 
{
             Assert.assertContains("test exception", e.getMessage());
         });
     }
@@ -243,38 +219,31 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
     @Test
     public void testFinishSessionWithSendException() throws IOException {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
-
         client.startSession();
 
         @SuppressWarnings("unchecked")
         Function<Message, Future<Void>> sendFunc = 
Mockito.mock(Function.class);
-        Whitebox.setInternalState(client.clientSession(),
-                                  "sendFunction", sendFunc);
+        Whitebox.setInternalState(client.clientSession(), "sendFunction", 
sendFunc);
 
         Mockito.doThrow(new RuntimeException("test exception"))
                .when(sendFunc)
                .apply(Mockito.any());
 
-        Assert.assertThrows(RuntimeException.class, () -> {
-            client.finishSession();
-        }, e -> {
+        Assert.assertThrows(RuntimeException.class, client::finishSession, e 
-> {
             Assert.assertContains("test exception", e.getMessage());
         });
     }
 
     @Test
     public void testFlowControl() throws IOException {
-        ByteBuffer buffer = ByteBuffer.wrap(
-                StringEncoding.encode("test data"));
+        ByteBuffer buffer = ByteBuffer.wrap(StringEncoding.encode("test 
data"));
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
 
         client.startSession();
 
-        Object sendFuncBak = Whitebox.getInternalState(client.clientSession(),
-                                                       "sendFunction");
+        Object sendFuncBak = Whitebox.getInternalState(client.clientSession(), 
"sendFunction");
         Function<Message, ChannelFuture> sendFunc = message -> null;
-        Whitebox.setInternalState(client.clientSession(),
-                                  "sendFunction", sendFunc);
+        Whitebox.setInternalState(client.clientSession(), "sendFunction", 
sendFunc);
 
         for (int i = 1; i <= conf.maxPendingRequests() * 2; i++) {
             boolean send = client.send(MessageType.MSG, 1, buffer);
@@ -285,10 +254,8 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
             }
         }
 
-        int maxRequestId = Whitebox.getInternalState(client.clientSession(),
-                                                     "maxRequestId");
-        int maxAckId = Whitebox.getInternalState(client.clientSession(),
-                                                 "maxAckId");
+        int maxRequestId = Whitebox.getInternalState(client.clientSession(), 
"maxRequestId");
+        int maxAckId = Whitebox.getInternalState(client.clientSession(), 
"maxAckId");
         Assert.assertEquals(conf.maxPendingRequests(), maxRequestId);
         Assert.assertEquals(0, maxAckId);
 
@@ -299,18 +266,15 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
         }
         Assert.assertTrue(client.checkSendAvailable());
 
-        maxAckId = Whitebox.getInternalState(client.clientSession(),
-                                             "maxAckId");
+        maxAckId = Whitebox.getInternalState(client.clientSession(), 
"maxAckId");
         Assert.assertEquals(pendings + 1, maxAckId);
 
-        Whitebox.setInternalState(client.clientSession(), "sendFunction",
-                                  sendFuncBak);
+        Whitebox.setInternalState(client.clientSession(), "sendFunction", 
sendFuncBak);
     }
 
     @Test
     public void testHandlerException() throws IOException {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
-
         client.startSession();
 
         Mockito.doThrow(new RuntimeException("test 
exception")).when(serverHandler)
@@ -320,14 +284,10 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
         boolean send = client.send(MessageType.MSG, 1, buffer);
         Assert.assertTrue(send);
 
-        Whitebox.setInternalState(client, "timeoutFinishSession",
-                                  1000L);
+        Whitebox.setInternalState(client, "timeoutFinishSession", 1000L);
 
-        Assert.assertThrows(TransportException.class, () -> {
-            client.finishSession();
-        }, e -> {
-            Assert.assertContains("to wait finish-response",
-                                  e.getMessage());
+        Assert.assertThrows(TransportException.class, client::finishSession, e 
-> {
+            Assert.assertContains("finish-response", e.getMessage());
         });
 
         Mockito.verify(serverHandler, Mockito.timeout(10_000L).times(1))
@@ -344,13 +304,11 @@ public class NettyTransportClientTest extends 
AbstractNetworkTest {
 
         TransportConf conf = TransportConf.wrapConfig(config);
 
-        Assert.assertThrows(IllegalArgumentException.class,
-                            conf::minPendingRequests);
+        Assert.assertThrows(IllegalArgumentException.class, 
conf::minPendingRequests);
     }
 
     @Test
-    public void testSessionActive() throws IOException, InterruptedException,
-                                           ExecutionException,
+    public void testSessionActive() throws IOException, InterruptedException, 
ExecutionException,
                                            TimeoutException {
         NettyTransportClient client = (NettyTransportClient) this.oneClient();
 
diff --git 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
index b423d386..bba8b9e5 100644
--- 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
+++ 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
@@ -72,17 +72,10 @@ public class MessageRecvManagerTest extends UnitTestBase {
         this.fileManager.init(this.config);
         this.sortManager = new RecvSortManager(context());
         this.sortManager.init(this.config);
-        this.receiveManager = new MessageRecvManager(context(),
-                                                     this.fileManager,
-                                                     this.sortManager);
-        this.snapshotManager = new SnapshotManager(context(),
-                                                   null,
-                                                   receiveManager,
-                                                   null);
+        this.receiveManager = new MessageRecvManager(context(), 
this.fileManager, this.sortManager);
+        this.snapshotManager = new SnapshotManager(context(), null, 
receiveManager, null);
         this.receiveManager.init(this.config);
-        this.connectionId = new ConnectionId(
-                            new InetSocketAddress("localhost",8081),
-                            0);
+        this.connectionId = new ConnectionId(new 
InetSocketAddress("localhost", 8081), 0);
     }
 
     @After
@@ -94,31 +87,28 @@ public class MessageRecvManagerTest extends UnitTestBase {
 
     @Test
     public void testVertexAndEdgeMessage() throws IOException {
-        // Send vertex message
+        // Send vertex messages
         this.receiveManager.onStarted(this.connectionId);
         this.receiveManager.onFinished(this.connectionId);
-        VertexMessageRecvPartitionTest.addTenVertexBuffer(
-                                       (NetworkBuffer buffer) -> {
+        VertexMessageRecvPartitionTest.addTenVertexBuffer((NetworkBuffer 
buffer) -> {
             this.receiveManager.handle(MessageType.VERTEX, 0, buffer);
         });
 
-        EdgeMessageRecvPartitionTest.addTenEdgeBuffer(
-                                     (NetworkBuffer buffer) -> {
+        EdgeMessageRecvPartitionTest.addTenEdgeBuffer((NetworkBuffer buffer) 
-> {
             this.receiveManager.handle(MessageType.EDGE, 0, buffer);
         });
-        // Send edge message
+        // Send edge messages
         this.receiveManager.onStarted(this.connectionId);
         this.receiveManager.onFinished(this.connectionId);
 
         this.receiveManager.waitReceivedAllMessages();
         Map<Integer, PeekableIterator<KvEntry>> vertexPartitions =
-                     this.receiveManager.vertexPartitions();
+                this.receiveManager.vertexPartitions();
         Map<Integer, PeekableIterator<KvEntry>> edgePartitions =
-                     this.receiveManager.edgePartitions();
+                this.receiveManager.edgePartitions();
         Assert.assertEquals(1, vertexPartitions.size());
         Assert.assertEquals(1, edgePartitions.size());
-        VertexMessageRecvPartitionTest.checkPartitionIterator(
-                                       vertexPartitions.get(0));
+        
VertexMessageRecvPartitionTest.checkPartitionIterator(vertexPartitions.get(0));
         EdgeMessageRecvPartitionTest.checkTenEdges(edgePartitions.get(0));
     }
 
@@ -126,9 +116,8 @@ public class MessageRecvManagerTest extends UnitTestBase {
     public void testComputeMessage() throws IOException {
         // Superstep 0
         this.receiveManager.beforeSuperstep(this.config, 0);
-        ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer(
-                                        (NetworkBuffer buffer) -> {
-             this.receiveManager.handle(MessageType.MSG, 0, buffer);
+        
ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer((NetworkBuffer 
buffer) -> {
+            this.receiveManager.handle(MessageType.MSG, 0, buffer);
         });
         this.receiveManager.onFinished(this.connectionId);
 
@@ -136,17 +125,15 @@ public class MessageRecvManagerTest extends UnitTestBase {
         this.receiveManager.afterSuperstep(this.config, 0);
 
         Map<Integer, PeekableIterator<KvEntry>> messagePartitions =
-        this.receiveManager.messagePartitions();
+                this.receiveManager.messagePartitions();
         Assert.assertEquals(1, messagePartitions.size());
-        ComputeMessageRecvPartitionTest.checkTenCombineMessages(
-                                        messagePartitions.get(0));
+        
ComputeMessageRecvPartitionTest.checkTenCombineMessages(messagePartitions.get(0));
     }
 
     @Test
     public void testOtherMessageType() {
         Assert.assertThrows(ComputerException.class, () -> {
-            ReceiverUtil.consumeBuffer(new byte[100],
-                                       (NetworkBuffer buffer) -> {
+            ReceiverUtil.consumeBuffer(new byte[100], (NetworkBuffer buffer) 
-> {
                 this.receiveManager.handle(MessageType.ACK, 0, buffer);
             });
         }, e -> {
@@ -161,8 +148,7 @@ public class MessageRecvManagerTest extends UnitTestBase {
         Assert.assertThrows(ComputerException.class, () -> {
             this.receiveManager.waitReceivedAllMessages();
         }, e -> {
-            Assert.assertContains("Expect 1 finish-messages",
-                                  e.getMessage());
+            Assert.assertContains("finish-messages", e.getMessage());
         });
     }
 }
diff --git 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
index b056c7d5..3e7d1c0b 100644
--- 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
+++ 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
@@ -47,56 +47,48 @@ public class WorkerServiceTest extends UnitTestBase {
 
         pool.submit(() -> {
             Config config = UnitTestBase.updateWithRequiredOptions(
-                ComputerOptions.JOB_ID, "local_002",
-                ComputerOptions.JOB_WORKERS_COUNT, "1",
-                ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
-                ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
-                ComputerOptions.BSP_LOG_INTERVAL, "30000",
-                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
-                ComputerOptions.WORKER_COMPUTATION_CLASS,
-                MockComputation.class.getName(),
-                ComputerOptions.ALGORITHM_RESULT_CLASS,
-                DoubleValue.class.getName(),
-                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
-                DoubleValue.class.getName(),
-                ComputerOptions.OUTPUT_CLASS,
-                LimitedLogOutput.class.getName()
+                    ComputerOptions.JOB_ID, "local_002",
+                    ComputerOptions.JOB_WORKERS_COUNT, "1",
+                    ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
+                    ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
+                    ComputerOptions.BSP_LOG_INTERVAL, "30000",
+                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+                    ComputerOptions.WORKER_COMPUTATION_CLASS,
+                    MockComputation.class.getName(),
+                    ComputerOptions.ALGORITHM_RESULT_CLASS,
+                    DoubleValue.class.getName(),
+                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+                    DoubleValue.class.getName(),
+                    ComputerOptions.OUTPUT_CLASS,
+                    LimitedLogOutput.class.getName()
             );
-            WorkerService workerService = new MockWorkerService();
-            try {
+            try (WorkerService workerService = new MockWorkerService()) {
                 workerService.init(config);
                 workerService.execute();
             } catch (Throwable e) {
                 LOG.error("Failed to start worker", e);
                 exceptions[0] = e;
             } finally {
-                workerService.close();
-                try {
-                    workerService.close();
-                } catch (Throwable e) {
-                    Assert.fail(e.getMessage());
-                }
                 countDownLatch.countDown();
             }
         });
 
         pool.submit(() -> {
             Config config = UnitTestBase.updateWithRequiredOptions(
-                RpcOptions.RPC_SERVER_HOST, "localhost",
-                ComputerOptions.JOB_ID, "local_002",
-                ComputerOptions.JOB_WORKERS_COUNT, "1",
-                ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
-                ComputerOptions.BSP_LOG_INTERVAL, "30000",
-                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
-                ComputerOptions.MASTER_COMPUTATION_CLASS,
-                MockMasterComputation.class.getName(),
-                ComputerOptions.ALGORITHM_RESULT_CLASS,
-                DoubleValue.class.getName(),
-                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
-                DoubleValue.class.getName()
+                    RpcOptions.RPC_SERVER_HOST, "localhost",
+                    ComputerOptions.JOB_ID, "local_002",
+                    ComputerOptions.JOB_WORKERS_COUNT, "1",
+                    ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
+                    ComputerOptions.BSP_LOG_INTERVAL, "30000",
+                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+                    ComputerOptions.MASTER_COMPUTATION_CLASS,
+                    MockMasterComputation.class.getName(),
+                    ComputerOptions.ALGORITHM_RESULT_CLASS,
+                    DoubleValue.class.getName(),
+                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+                    DoubleValue.class.getName()
             );
-            MasterService masterService = new MasterService();
-            try {
+            try (MasterService masterService = new MasterService()) {
                 masterService.init(config);
                 masterService.execute();
             } catch (Throwable e) {
@@ -108,12 +100,6 @@ public class WorkerServiceTest extends UnitTestBase {
                  * if count down is executed first, and the server thread in
                  * master service will not be closed.
                  */
-                masterService.close();
-                try {
-                    masterService.close();
-                } catch (Throwable e) {
-                    Assert.fail(e.getMessage());
-                }
                 countDownLatch.countDown();
             }
         });
@@ -121,8 +107,7 @@ public class WorkerServiceTest extends UnitTestBase {
         countDownLatch.await();
         pool.shutdownNow();
 
-        Assert.assertFalse(Arrays.asList(exceptions).toString(),
-                           existError(exceptions));
+        Assert.assertFalse(Arrays.asList(exceptions).toString(), 
existError(exceptions));
     }
 
     @Test
@@ -133,89 +118,84 @@ public class WorkerServiceTest extends UnitTestBase {
 
         pool.submit(() -> {
             Config config = UnitTestBase.updateWithRequiredOptions(
-                ComputerOptions.JOB_ID, "local_003",
-                ComputerOptions.JOB_WORKERS_COUNT, "2",
-                ComputerOptions.JOB_PARTITIONS_COUNT, "2",
-                ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
-                ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
-                ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
-                ComputerOptions.BSP_LOG_INTERVAL, "10000",
-                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
-                ComputerOptions.WORKER_COMPUTATION_CLASS,
-                MockComputation2.class.getName(),
-                ComputerOptions.ALGORITHM_RESULT_CLASS,
-                DoubleValue.class.getName(),
-                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
-                DoubleValue.class.getName()
+                    ComputerOptions.JOB_ID, "local_003",
+                    ComputerOptions.JOB_WORKERS_COUNT, "2",
+                    ComputerOptions.JOB_PARTITIONS_COUNT, "2",
+                    ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
+                    ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
+                    ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
+                    ComputerOptions.BSP_LOG_INTERVAL, "10000",
+                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+                    ComputerOptions.WORKER_COMPUTATION_CLASS,
+                    MockComputation2.class.getName(),
+                    ComputerOptions.ALGORITHM_RESULT_CLASS,
+                    DoubleValue.class.getName(),
+                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+                    DoubleValue.class.getName()
             );
-            WorkerService workerService = new MockWorkerService();
-            try {
+
+            try (WorkerService workerService = new MockWorkerService()) {
                 workerService.init(config);
                 workerService.execute();
             } catch (Throwable e) {
                 LOG.error("Failed to start worker", e);
                 exceptions[0] = e;
             } finally {
-                workerService.close();
                 countDownLatch.countDown();
             }
         });
 
         pool.submit(() -> {
             Config config = UnitTestBase.updateWithRequiredOptions(
-                ComputerOptions.JOB_ID, "local_003",
-                ComputerOptions.JOB_WORKERS_COUNT, "2",
-                ComputerOptions.JOB_PARTITIONS_COUNT, "2",
-                ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
-                ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
-                ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
-                ComputerOptions.BSP_LOG_INTERVAL, "10000",
-                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
-                ComputerOptions.WORKER_COMPUTATION_CLASS,
-                MockComputation2.class.getName(),
-                ComputerOptions.ALGORITHM_RESULT_CLASS,
-                DoubleValue.class.getName(),
-                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
-                DoubleValue.class.getName()
+                    ComputerOptions.JOB_ID, "local_003",
+                    ComputerOptions.JOB_WORKERS_COUNT, "2",
+                    ComputerOptions.JOB_PARTITIONS_COUNT, "2",
+                    ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
+                    ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
+                    ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
+                    ComputerOptions.BSP_LOG_INTERVAL, "10000",
+                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+                    ComputerOptions.WORKER_COMPUTATION_CLASS,
+                    MockComputation2.class.getName(),
+                    ComputerOptions.ALGORITHM_RESULT_CLASS,
+                    DoubleValue.class.getName(),
+                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+                    DoubleValue.class.getName()
             );
-            WorkerService workerService = new MockWorkerService();
-            try {
+            try (WorkerService workerService = new MockWorkerService()) {
                 workerService.init(config);
                 workerService.execute();
             } catch (Throwable e) {
                 LOG.error("Failed to start worker", e);
                 exceptions[1] = e;
             } finally {
-                workerService.close();
                 countDownLatch.countDown();
             }
         });
 
         pool.submit(() -> {
             Config config = UnitTestBase.updateWithRequiredOptions(
-                RpcOptions.RPC_SERVER_HOST, "localhost",
-                ComputerOptions.JOB_ID, "local_003",
-                ComputerOptions.JOB_WORKERS_COUNT, "2",
-                ComputerOptions.JOB_PARTITIONS_COUNT, "2",
-                ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
-                ComputerOptions.BSP_LOG_INTERVAL, "10000",
-                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
-                ComputerOptions.MASTER_COMPUTATION_CLASS,
-                MockMasterComputation2.class.getName(),
-                ComputerOptions.ALGORITHM_RESULT_CLASS,
-                DoubleValue.class.getName(),
-                ComputerOptions.ALGORITHM_MESSAGE_CLASS,
-                DoubleValue.class.getName()
+                    RpcOptions.RPC_SERVER_HOST, "localhost",
+                    ComputerOptions.JOB_ID, "local_003",
+                    ComputerOptions.JOB_WORKERS_COUNT, "2",
+                    ComputerOptions.JOB_PARTITIONS_COUNT, "2",
+                    ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
+                    ComputerOptions.BSP_LOG_INTERVAL, "10000",
+                    ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+                    ComputerOptions.MASTER_COMPUTATION_CLASS,
+                    MockMasterComputation2.class.getName(),
+                    ComputerOptions.ALGORITHM_RESULT_CLASS,
+                    DoubleValue.class.getName(),
+                    ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+                    DoubleValue.class.getName()
             );
-            MasterService masterService = new MasterService();
-            try {
+            try (MasterService masterService = new MasterService()) {
                 masterService.init(config);
                 masterService.execute();
             } catch (Throwable e) {
                 LOG.error("Failed to start master", e);
                 exceptions[2] = e;
             } finally {
-                masterService.close();
                 countDownLatch.countDown();
             }
         });
@@ -223,41 +203,37 @@ public class WorkerServiceTest extends UnitTestBase {
         countDownLatch.await();
         pool.shutdownNow();
 
-        Assert.assertFalse(Arrays.asList(exceptions).toString(),
-                           existError(exceptions));
+        Assert.assertFalse(Arrays.asList(exceptions).toString(), 
existError(exceptions));
     }
 
     @Test
     public void testFailToConnectEtcd() {
         Config config = UnitTestBase.updateWithRequiredOptions(
-            // Unavailable etcd endpoints
-            ComputerOptions.BSP_ETCD_ENDPOINTS, "http://abc:8098";,
-            ComputerOptions.JOB_ID, "local_004",
-            ComputerOptions.JOB_WORKERS_COUNT, "1",
-            ComputerOptions.BSP_LOG_INTERVAL, "30000",
-            ComputerOptions.BSP_MAX_SUPER_STEP, "2",
-            ComputerOptions.WORKER_COMPUTATION_CLASS,
-            MockComputation.class.getName()
+                // Unavailable etcd endpoints
+                ComputerOptions.BSP_ETCD_ENDPOINTS, "http://invalid-ip:8098";,
+                ComputerOptions.JOB_ID, "local_004",
+                ComputerOptions.JOB_WORKERS_COUNT, "1",
+                ComputerOptions.BSP_LOG_INTERVAL, "30000",
+                ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+                ComputerOptions.WORKER_COMPUTATION_CLASS,
+                MockComputation.class.getName()
         );
-        WorkerService workerService = new MockWorkerService();
-        Assert.assertThrows(ComputerException.class, () -> {
-            workerService.init(config);
-            try {
+
+        try (WorkerService workerService = new MockWorkerService()) {
+            Assert.assertThrows(ComputerException.class, () -> {
+                workerService.init(config);
                 workerService.execute();
-            } finally {
-                workerService.close();
-            }
-        }, e -> {
-            Assert.assertContains("Error while getting with " +
-                                  "key='BSP_MASTER_INIT_DONE'",
-                                  e.getMessage());
-            Assert.assertContains("UNAVAILABLE: unresolved address",
-                                  e.getCause().getMessage());
-        });
+            }, e -> {
+                Assert.assertContains("Error while getting with 
key='BSP_MASTER_INIT_DONE'",
+                                      e.getMessage());
+                Assert.assertContains("UNAVAILABLE: unresolved address",
+                                      e.getCause().getMessage());
+            });
+        }
     }
 
     @Test
-    public void testDataTransportManagerFail() throws InterruptedException {
+    public void testDataTransportManagerFail() {
         /*
          * TODO: Complete this test case after data transport manager is
          *  completed.
diff --git 
a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
 
b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
index 7ac3e6cb..b95b602e 100644
--- 
a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
+++ 
b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
@@ -79,8 +79,7 @@ public class KubernetesDriverTest extends AbstractK8sTest {
         File tempFile = File.createTempFile(UUID.randomUUID().toString(), "");
         try {
             String absolutePath = tempFile.getAbsolutePath();
-            this.updateOptions(KubeDriverOptions.KUBE_CONFIG.name(),
-                               absolutePath);
+            this.updateOptions(KubeDriverOptions.KUBE_CONFIG.name(), 
absolutePath);
             NamedCluster cluster = new NamedClusterBuilder()
                     .withName("kubernetes")
                     .withNewCluster()
@@ -96,10 +95,11 @@ public class KubernetesDriverTest extends AbstractK8sTest {
                     .endContext()
                     .build();
             io.fabric8.kubernetes.api.model.Config config = Config.builder()
-                    .withClusters(cluster)
-                    .addToContexts(context)
-                    .withCurrentContext(context.getName())
-                    .build();
+                                                                  
.withClusters(cluster)
+                                                                  
.addToContexts(context)
+                                                                  
.withCurrentContext(
+                                                                          
context.getName())
+                                                                  .build();
             KubeConfigUtils.persistKubeConfigIntoFile(config, absolutePath);
             System.setProperty(Config.KUBERNETES_KUBECONFIG_FILE, 
absolutePath);
 
@@ -126,24 +126,18 @@ public class KubernetesDriverTest extends AbstractK8sTest 
{
         String namespace = Whitebox.getInternalState(this.driver, "namespace");
         HugeConfig conf = Whitebox.getInternalState(this.driver, "conf");
         Object operation = Whitebox.getInternalState(this.driver, "operation");
-        MutableBoolean watchActive = Whitebox.getInternalState(
-                                     this.driver, "watchActive");
+        MutableBoolean watchActive = Whitebox.getInternalState(this.driver, 
"watchActive");
         Assert.assertTrue(watchActive.booleanValue());
         Assert.assertEquals(namespace, "test");
         Assert.assertNotNull(conf);
         Assert.assertNotNull(operation);
 
         final int workerInstances = 2;
-        this.updateOptions(KubeSpecOptions.WORKER_INSTANCES.name(),
-                           workerInstances);
-        Map<String, Object> defaultSpec = Whitebox.invoke(
-                                          KubernetesDriver.class,
-                                          "defaultSpec",
-                                          this.driver);
-        String workerInstancesKey = KubeUtil.covertSpecKey(
-                                    KubeSpecOptions.WORKER_INSTANCES.name());
-        Assert.assertEquals(defaultSpec.get(workerInstancesKey),
-                            workerInstances);
+        this.updateOptions(KubeSpecOptions.WORKER_INSTANCES.name(), 
workerInstances);
+        Map<String, Object> defaultSpec = 
Whitebox.invoke(KubernetesDriver.class,
+                                                          "defaultSpec", 
this.driver);
+        String workerInstancesKey = 
KubeUtil.covertSpecKey(KubeSpecOptions.WORKER_INSTANCES.name());
+        Assert.assertEquals(defaultSpec.get(workerInstancesKey), 
workerInstances);
     }
 
     @Test
@@ -167,21 +161,23 @@ public class KubernetesDriverTest extends AbstractK8sTest 
{
     }
 
     @Test
-    public void testUploadAlgorithmJarWithError() throws FileNotFoundException 
{
+    public void testUploadAlgorithmJarWithError() {
         Whitebox.setInternalState(this.driver, "bashPath", 
"conf/images/upload_test-x.sh");
         String url = "https://github.com/apache/hugegraph-doc/raw/"; +
                      "binary-1.0/dist/computer/test.jar";
         String path = "conf/images/test.jar";
         downloadFileByUrl(url, path);
 
-        InputStream inputStream = new FileInputStream(path);
-        Assert.assertThrows(ComputerDriverException.class, () -> {
-            this.driver.uploadAlgorithmJar("PageRank", inputStream);
-        }, e -> {
-            ComputerDriverException exception = (ComputerDriverException) e;
-            Assert.assertContains("No such file",
-                                  exception.rootCause().getMessage());
-        });
+        try (InputStream inputStream = new FileInputStream(path)) {
+            Assert.assertThrows(ComputerDriverException.class, () -> {
+                this.driver.uploadAlgorithmJar("PageRank", inputStream);
+            }, e -> {
+                ComputerDriverException exception = (ComputerDriverException) 
e;
+                Assert.assertContains("No such file", 
exception.rootCause().getMessage());
+            });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Test
@@ -189,12 +185,9 @@ public class KubernetesDriverTest extends AbstractK8sTest {
         Map<String, String> params = new HashMap<>();
         params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10");
         String jobId = this.driver.submitJob("PageRank", params);
-        HugeGraphComputerJob computerJob = this.operation
-                                               
.withName(KubeUtil.crName(jobId))
-                                               .get();
+        HugeGraphComputerJob computerJob = 
this.operation.withName(KubeUtil.crName(jobId)).get();
         Assert.assertNotNull(computerJob);
-        Assert.assertEquals(computerJob.getSpec().getAlgorithmName(),
-                            "PageRank");
+        Assert.assertEquals(computerJob.getSpec().getAlgorithmName(), 
"PageRank");
         Assert.assertEquals(computerJob.getSpec().getJobId(), jobId);
     }
 
@@ -205,16 +198,13 @@ public class KubernetesDriverTest extends AbstractK8sTest 
{
         String jobId = this.driver.submitJob("PageRank2", params);
 
         String crName = KubeUtil.crName(jobId);
-        HugeGraphComputerJob computerJob = this.operation.withName(crName)
-                                                         .get();
+        HugeGraphComputerJob computerJob = 
this.operation.withName(crName).get();
         Assert.assertNotNull(computerJob);
 
         UnitTestBase.sleep(1000L);
 
         this.driver.cancelJob(jobId, params);
-        HugeGraphComputerJob canceledComputerJob = this.operation
-                                                       .withName(crName)
-                                                       .get();
+        HugeGraphComputerJob canceledComputerJob = 
this.operation.withName(crName).get();
         Assert.assertNull(canceledComputerJob);
         Assert.assertNull(this.driver.jobState(jobId, params));
     }
@@ -227,26 +217,21 @@ public class KubernetesDriverTest extends AbstractK8sTest 
{
 
         JobObserver jobObserver = Mockito.mock(JobObserver.class);
 
-        CompletableFuture<Void> future = this.driver.waitJobAsync(jobId,
-                                                                  params,
-                                                                  jobObserver);
+        CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, 
params, jobObserver);
 
         Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1))
                .onJobStateChanged(Mockito.any(DefaultJobState.class));
 
         future.getNow(null);
 
-        MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
-                                                               "watchActive");
+        MutableBoolean watchActive = Whitebox.getInternalState(this.driver, 
"watchActive");
         watchActive.setFalse();
         this.driver.waitJobAsync(jobId, params, jobObserver);
 
         this.driver.cancelJob(jobId, params);
         UnitTestBase.sleep(1000L);
 
-        CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId,
-                                                                   params,
-                                                                   
jobObserver);
+        CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId, 
params, jobObserver);
         Assert.assertNull(future2);
     }
 
@@ -264,15 +249,12 @@ public class KubernetesDriverTest extends AbstractK8sTest 
{
     @Test
     public void testOnClose() {
         Map<String, Pair<CompletableFuture<Void>, JobObserver>> waits =
-        Whitebox.getInternalState(this.driver, "waits");
-        waits.put("test-123", Pair.of(new CompletableFuture<>(),
-                                      Mockito.mock(JobObserver.class)));
+                Whitebox.getInternalState(this.driver, "waits");
+        waits.put("test-123", Pair.of(new CompletableFuture<>(), 
Mockito.mock(JobObserver.class)));
 
-        AbstractWatchManager<HugeGraphComputerJob> watch =
-                                                   Whitebox.getInternalState(
-                                                   this.driver, "watch");
-        Watcher<HugeGraphComputerJob> watcher = Whitebox.getInternalState(
-                                                watch, "watcher");
+        AbstractWatchManager<HugeGraphComputerJob> watch = 
Whitebox.getInternalState(this.driver,
+                                                                               
      "watch");
+        Watcher<HugeGraphComputerJob> watcher = 
Whitebox.getInternalState(watch, "watcher");
 
         watcher.eventReceived(Watcher.Action.ADDED, null);
         watcher.eventReceived(Watcher.Action.ERROR, new 
HugeGraphComputerJob());
@@ -283,8 +265,7 @@ public class KubernetesDriverTest extends AbstractK8sTest {
         WatcherException testClose = new WatcherException("test close");
         watcher.onClose(testClose);
 
-        MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
-                                                               "watchActive");
+        MutableBoolean watchActive = Whitebox.getInternalState(this.driver, 
"watchActive");
         Assert.assertFalse(watchActive.booleanValue());
     }
 
@@ -297,14 +278,12 @@ public class KubernetesDriverTest extends AbstractK8sTest 
{
         Assert.assertThrows(IllegalArgumentException.class, () -> {
             this.driver.submitJob("PageRank3", params);
         }, e -> {
-            Assert.assertContains(
-                    "The partitions count must be >= workers instances",
-                    e.getMessage()
+            Assert.assertContains("The partitions count must be >= workers 
instances",
+                                  e.getMessage()
             );
         });
 
-        Map<String, String> defaultConf = Whitebox.getInternalState(
-                                          this.driver, "defaultConf");
+        Map<String, String> defaultConf = 
Whitebox.getInternalState(this.driver, "defaultConf");
         defaultConf = new HashMap<>(defaultConf);
         defaultConf.remove(ComputerOptions.ALGORITHM_PARAMS_CLASS.name());
         Whitebox.setInternalState(this.driver, "defaultConf", defaultConf);
@@ -312,9 +291,8 @@ public class KubernetesDriverTest extends AbstractK8sTest {
         Assert.assertThrows(IllegalArgumentException.class, () -> {
             this.driver.submitJob("PageRank3", params);
         }, e -> {
-            Assert.assertContains(
-                    "The [algorithm.params_class] options can't be null",
-                    e.getMessage()
+            Assert.assertContains("The [algorithm.params_class] options can't 
be null",
+                                  e.getMessage()
             );
         });
     }
diff --git a/pom.xml b/pom.xml
index 2784670b..a0d0634d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,8 @@
     <name>hugegraph-computer</name>
     <url>https://github.com/apache/hugegraph-computer</url>
     <description>
-        hugegraph-computer is a fast-speed, highly-scalable, fault-tolerance 
graph processing system developed by apache.
+        hugegraph-computer is a fast-speed, highly scalable, fault-tolerance 
graph processing
+        system developed by apache.
     </description>
 
     <inceptionYear>2020</inceptionYear>
@@ -88,7 +89,12 @@
     </prerequisites>
 
     <properties>
-        <revision>1.0.0</revision>
+        <!-- TODO: update the version after toolchain v1.2 fixed -->
+        <revision>1.2.0</revision>
+        <hugegraph-common-version>1.2.0</hugegraph-common-version>
+        <hugegraph-client-version>1.2.0</hugegraph-client-version>
+        <hugegraph-rpc-version>1.2.0</hugegraph-rpc-version>
+        <hugegraph-loader-version>1.2.0</hugegraph-loader-version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <top.level.dir>${project.basedir}/..</top.level.dir>
         <release.name>hugegraph-computer</release.name>
@@ -100,10 +106,6 @@
         <hadoop-version>3.1.2</hadoop-version>
         <netty-version>4.1.42.Final</netty-version>
         <commons-lang3-version>3.12.0</commons-lang3-version>
-        <hugegraph-common-version>1.0.0</hugegraph-common-version>
-        <hugegraph-client-version>1.0.0</hugegraph-client-version>
-        <hugegraph-rpc-version>1.0.0</hugegraph-rpc-version>
-        <hugegraph-loader-version>1.0.0</hugegraph-loader-version>
         <minio-version>8.5.6</minio-version>
     </properties>
 
@@ -347,7 +349,7 @@
             </plugin>
             <plugin>
                 <artifactId>maven-compiler-plugin</artifactId>
-                <!-- Keep fix version to avoid computer-k8s-operator build 
error -->
+                <!-- Keep a fixed version to avoid computer-k8s-operator build 
error -->
                 <version>3.1</version>
                 <configuration>
                     <source>${compiler.source}</source>
@@ -526,5 +528,15 @@
                 </plugins>
             </build>
         </profile>
+        <!-- use mvn -P stage to enable the remote apache-stage repo -->
+        <profile>
+            <id>stage</id>
+            <repositories>
+                <repository>
+                    <id>staged-releases</id>
+                    
<url>https://repository.apache.org/content/groups/staging/</url>
+                </repository>
+            </repositories>
+        </profile>
     </profiles>
 </project>

Reply via email to