This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d6a32ac  [Feature#3234][cluster]enhanced load balancing (#3235)
d6a32ac is described below

commit d6a32ac65225fa4f8d9a80c09c5ea8a29e392dac
Author: CalvinKirs <acm_mas...@163.com>
AuthorDate: Mon Aug 10 19:05:21 2020 +0800

    [Feature#3234][cluster]enhanced load balancing (#3235)
    
    * [Future#3234][cluster]enhanced load balancing
    weight-based load balancing algorithm
    this close # 3234
    
    * remove useless parameter
    
    * code smell
    
    * load balancing according to work group
    
    * add smooth weight round robin
    
    * remove unused constants
    
    * perfect unit test
    
    * code smell
    
    * code smell
    
    * add work weight config
    
    * fix config error
    
    * add weight docs to readme.md
---
 docker/build/README.md                             |   4 +
 docker/build/README_zh_CN.md                       |   4 +
 .../conf/dolphinscheduler/worker.properties.tpl    |   5 +-
 docker/build/startup-init-conf.sh                  |   1 +
 docker/docker-swarm/docker-compose.yml             |   1 +
 docker/docker-swarm/docker-stack.yml               |   1 +
 .../configmap-dolphinscheduler-worker.yaml         |   1 +
 .../statefulset-dolphinscheduler-worker.yaml       |   5 +
 docker/kubernetes/dolphinscheduler/values.yaml     |   1 +
 .../apache/dolphinscheduler/remote/utils/Host.java |  63 +++++++++--
 .../master/dispatch/host/CommonHostManager.java    |   7 +-
 .../master/dispatch/host/RandomHostManager.java    |   2 +-
 .../dispatch/host/RoundRobinHostManager.java       |   2 +-
 .../dispatch/host/assign/RandomSelector.java       |  45 +++++---
 .../dispatch/host/assign/RoundRobinSelector.java   | 120 ++++++++++++++++++---
 .../server/worker/config/WorkerConfig.java         |  12 +++
 .../server/worker/registry/WorkerRegistry.java     |  28 +++--
 .../src/main/resources/worker.properties           |   3 +
 .../dispatch/host/assign/RandomSelectorTest.java   |  16 +--
 .../host/assign/RoundRobinSelectorTest.java        |  51 ++++++---
 20 files changed, 303 insertions(+), 69 deletions(-)

diff --git a/docker/build/README.md b/docker/build/README.md
index bc516bc..951f2d6 100644
--- a/docker/build/README.md
+++ b/docker/build/README.md
@@ -238,6 +238,10 @@ This environment variable sets max cpu load avg for 
`worker-server`. The default
 
 This environment variable sets reserved memory for `worker-server`. The 
default value is `0.1`.
 
+**`WORKER_WEIGHT`**
+
+This environment variable sets port for `worker-server`. The default value is 
`100`.
+
 **`WORKER_LISTEN_PORT`**
 
 This environment variable sets port for `worker-server`. The default value is 
`1234`.
diff --git a/docker/build/README_zh_CN.md b/docker/build/README_zh_CN.md
index c2affc0..c4339a9 100644
--- a/docker/build/README_zh_CN.md
+++ b/docker/build/README_zh_CN.md
@@ -238,6 +238,10 @@ Dolphin Scheduler映像使用了几个容易遗漏的环境变量。虽然这些
 
 配置`worker-server`的保留内存,默认值 `0.1`。
 
+**`WORKER_WEIGHT`**
+
+配置`worker-server`的权重,默认之`100`。
+
 **`WORKER_LISTEN_PORT`**
 
 配置`worker-server`的端口,默认值 `1234`。
diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl 
b/docker/build/conf/dolphinscheduler/worker.properties.tpl
index d596be9..83097dd 100644
--- a/docker/build/conf/dolphinscheduler/worker.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl
@@ -34,4 +34,7 @@ worker.reserved.memory=${WORKER_RESERVED_MEMORY}
 #worker.listen.port=${WORKER_LISTEN_PORT}
 
 # default worker group
-#worker.group=${WORKER_GROUP}
\ No newline at end of file
+#worker.groups=${WORKER_GROUP}
+
+# default worker weight
+#worker.weight=${WORKER_WEIGHT}
\ No newline at end of file
diff --git a/docker/build/startup-init-conf.sh 
b/docker/build/startup-init-conf.sh
index 73fdad6..d5cd86f 100644
--- a/docker/build/startup-init-conf.sh
+++ b/docker/build/startup-init-conf.sh
@@ -74,6 +74,7 @@ export WORKER_MAX_CPULOAD_AVG=${WORKER_MAX_CPULOAD_AVG:-"100"}
 export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"}
 export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"}
 export WORKER_GROUP=${WORKER_GROUP:-"default"}
+export WORKER_WEIGHT=${WORKER_WEIGHT:-"100"}
 
 #============================================================================
 # Alert Server
diff --git a/docker/docker-swarm/docker-compose.yml 
b/docker/docker-swarm/docker-compose.yml
index 51eb0ae..349b3ad 100644
--- a/docker/docker-swarm/docker-compose.yml
+++ b/docker/docker-swarm/docker-compose.yml
@@ -187,6 +187,7 @@ services:
       WORKER_MAX_CPULOAD_AVG: "100"
       WORKER_RESERVED_MEMORY: "0.1"
       WORKER_GROUP: "default"
+      WORKER_WEIGHT: "100"
       DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler"
       DATABASE_HOST: dolphinscheduler-postgresql
       DATABASE_PORT: 5432
diff --git a/docker/docker-swarm/docker-stack.yml 
b/docker/docker-swarm/docker-stack.yml
index ca9f7c8..dff4a47 100644
--- a/docker/docker-swarm/docker-stack.yml
+++ b/docker/docker-swarm/docker-stack.yml
@@ -187,6 +187,7 @@ services:
       WORKER_MAX_CPULOAD_AVG: "100"
       WORKER_RESERVED_MEMORY: "0.1"
       WORKER_GROUP: "default"
+      WORKER_WEIGHT: "100"
       DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler"
       DATABASE_HOST: dolphinscheduler-postgresql
       DATABASE_PORT: 5432
diff --git 
a/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml
 
b/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml
index 1e08b67..569341c 100644
--- 
a/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml
+++ 
b/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml
@@ -31,6 +31,7 @@ data:
   WORKER_RESERVED_MEMORY: {{ .Values.worker.configmap.WORKER_RESERVED_MEMORY | 
quote }}
   WORKER_LISTEN_PORT: {{ .Values.worker.configmap.WORKER_LISTEN_PORT | quote }}
   WORKER_GROUP: {{ .Values.worker.configmap.WORKER_GROUP | quote }}
+  WORKER_WEIGHT: {{ .Values.worker.configmap.WORKER_WEIGHT | quote }}
   DOLPHINSCHEDULER_DATA_BASEDIR_PATH: {{ include 
"dolphinscheduler.worker.base.dir" . | quote }}
   dolphinscheduler_env.sh: |-
   {{- range .Values.worker.configmap.DOLPHINSCHEDULER_ENV }}
diff --git 
a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
 
b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
index fd32a95..ae562cc 100644
--- 
a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
+++ 
b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
@@ -162,6 +162,11 @@ spec:
                 configMapKeyRef:
                   name: {{ include "dolphinscheduler.fullname" . }}-worker
                   key: WORKER_GROUP
+            - name: WORKER_WEUGHT
+              valueFrom:
+                configMapKeyRef:
+                  name: {{ include "dolphinscheduler.fullname" . }}-worker
+                  key: WORKER_WEIGHT
             - name: DOLPHINSCHEDULER_DATA_BASEDIR_PATH
               valueFrom:
                 configMapKeyRef:
diff --git a/docker/kubernetes/dolphinscheduler/values.yaml 
b/docker/kubernetes/dolphinscheduler/values.yaml
index 8acb1d3..3261b08 100644
--- a/docker/kubernetes/dolphinscheduler/values.yaml
+++ b/docker/kubernetes/dolphinscheduler/values.yaml
@@ -201,6 +201,7 @@ worker:
     WORKER_RESERVED_MEMORY: "0.1"
     WORKER_LISTEN_PORT: "1234"
     WORKER_GROUP: "default"
+    WORKER_WEIGHT: "100"
     DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler"
     DOLPHINSCHEDULER_ENV:
     - "export HADOOP_HOME=/opt/soft/hadoop"
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
index e9eaabc..b905a9f 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
@@ -20,7 +20,7 @@ import java.io.Serializable;
 import java.util.Objects;
 
 /**
- *  server address
+ * server address
  */
 public class Host implements Serializable {
 
@@ -39,6 +39,16 @@ public class Host implements Serializable {
      */
     private int port;
 
+    /**
+     * weight
+     */
+    private int weight;
+
+    /**
+     * workGroup
+     */
+    private String workGroup;
+
     public Host() {
     }
 
@@ -48,6 +58,21 @@ public class Host implements Serializable {
         this.address = ip + ":" + port;
     }
 
+    public Host(String ip, int port, int weight) {
+        this.ip = ip;
+        this.port = port;
+        this.address = ip + ":" + port;
+        this.weight = weight;
+    }
+
+    public Host(String ip, int port, int weight,String workGroup) {
+        this.ip = ip;
+        this.port = port;
+        this.address = ip + ":" + port;
+        this.weight = weight;
+        this.workGroup=workGroup;
+    }
+
     public String getAddress() {
         return address;
     }
@@ -65,6 +90,14 @@ public class Host implements Serializable {
         this.address = ip + ":" + port;
     }
 
+    public int getWeight() {
+        return weight;
+    }
+
+    public void setWeight(int weight) {
+        this.weight = weight;
+    }
+
     public int getPort() {
         return port;
     }
@@ -74,31 +107,47 @@ public class Host implements Serializable {
         this.address = ip + ":" + port;
     }
 
+    public String getWorkGroup() {
+        return workGroup;
+    }
+
+    public void setWorkGroup(String workGroup) {
+        this.workGroup = workGroup;
+    }
+
     /**
      * address convert host
+     *
      * @param address address
      * @return host
      */
-    public static Host of(String address){
-        if(address == null) {
+    public static Host of(String address) {
+        if (address == null) {
             throw new IllegalArgumentException("Host : address is null.");
         }
         String[] parts = address.split(":");
-        if (parts.length != 2) {
+        if (parts.length < 2) {
             throw new IllegalArgumentException(String.format("Host : %s 
illegal.", address));
         }
-        Host host = new Host(parts[0], Integer.parseInt(parts[1]));
+        Host host = null;
+        if (parts.length == 2) {
+            host = new Host(parts[0], Integer.parseInt(parts[1]));
+        }
+        if (parts.length == 3) {
+            host = new Host(parts[0], Integer.parseInt(parts[1]), 
Integer.parseInt(parts[2]));
+        }
         return host;
     }
 
     /**
      * whether old version
+     *
      * @param address address
      * @return old version is true , otherwise is false
      */
-    public static Boolean isOldVersion(String address){
+    public static Boolean isOldVersion(String address) {
         String[] parts = address.split(":");
-        return parts.length != 2 ? true : false;
+        return parts.length != 2 && parts.length != 3;
     }
 
     @Override
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
index 58006bf..4a3d4bd 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
@@ -71,7 +71,12 @@ public abstract class CommonHostManager implements 
HostManager {
             return host;
         }
         List<Host> candidateHosts = new ArrayList<>(nodes.size());
-        nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
+        nodes.forEach(node -> {
+            Host nodeHost=Host.of(node);
+            nodeHost.setWorkGroup(context.getWorkerGroup());
+            candidateHosts.add(nodeHost);
+        });
+
 
         return select(candidateHosts);
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
index ef2b6fd..241906a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
@@ -38,7 +38,7 @@ public class RandomHostManager extends CommonHostManager {
      * set round robin
      */
     public RandomHostManager(){
-        this.selector = new RandomSelector<>();
+        this.selector = new RandomSelector();
     }
 
     @Override
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
index e9fef49..ec1945e 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
@@ -38,7 +38,7 @@ public class RoundRobinHostManager extends CommonHostManager {
      * set round robin
      */
     public RoundRobinHostManager(){
-        this.selector = new RoundRobinSelector<>();
+        this.selector = new RoundRobinSelector();
     }
 
     @Override
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
index e00d6f7..6975127 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
@@ -17,27 +17,44 @@
 
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
+import org.apache.dolphinscheduler.remote.utils.Host;
+
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Random;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * random selector
- * @param <T> T
  */
-public class RandomSelector<T> extends AbstractSelector<T> {
-
-    private final Random random = new Random();
+public class RandomSelector extends AbstractSelector<Host> {
 
     @Override
-    public T doSelect(final Collection<T> source) {
-
-        int size = source.size();
-        /**
-         *  random select
-         */
-        int randomIndex = random.nextInt(size);
-
-        return (T) source.toArray()[randomIndex];
+    public Host doSelect(final Collection<Host> source) {
+
+        List<Host> hosts = new ArrayList<>(source);
+        int size = hosts.size();
+        int[] weights = new int[size];
+        int totalWeight = 0;
+        int index = 0;
+
+        for (Host host : hosts) {
+            totalWeight += host.getWeight();
+            weights[index] = host.getWeight();
+            index++;
+        }
+
+        if (totalWeight > 0) {
+            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
+
+            for (int i = 0; i < size; i++) {
+                offset -= weights[i];
+                if (offset < 0) {
+                    return hosts.get(i);
+                }
+            }
+        }
+        return hosts.get(ThreadLocalRandom.current().nextInt(size));
     }
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
index 06e469f..34a79ac 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
@@ -16,27 +16,123 @@
  */
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.springframework.stereotype.Service;
 
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * round robin selector
- * @param <T> T
+ * Smooth Weight Round Robin
  */
 @Service
-public class RoundRobinSelector<T> extends AbstractSelector<T> {
+public class RoundRobinSelector extends AbstractSelector<Host> {
+
+    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> 
workGroupWeightMap = new ConcurrentHashMap<>();
+
+    private static final int RECYCLE_PERIOD = 100000;
+
+    private AtomicBoolean updateLock = new AtomicBoolean();
+
+    protected static class WeightedRoundRobin {
+        private int weight;
+        private AtomicLong current = new AtomicLong(0);
+        private long lastUpdate;
+
+        int getWeight() {
+            return weight;
+        }
+
+        void setWeight(int weight) {
+            this.weight = weight;
+            current.set(0);
+        }
+
+        long increaseCurrent() {
+            return current.addAndGet(weight);
+        }
+
+        void sel(int total) {
+            current.addAndGet(-1L * total);
+        }
+
+        long getLastUpdate() {
+            return lastUpdate;
+        }
+
+        void setLastUpdate(long lastUpdate) {
+            this.lastUpdate = lastUpdate;
+        }
+
+    }
 
-    private final AtomicInteger index = new AtomicInteger(0);
 
     @Override
-    public T doSelect(Collection<T> source) {
+    public Host doSelect(Collection<Host> source) {
+
+        List<Host> hosts = new ArrayList<>(source);
+        String key = hosts.get(0).getWorkGroup();
+        ConcurrentMap<String, WeightedRoundRobin> map = 
workGroupWeightMap.get(key);
+        if (map == null) {
+            workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
+            map = workGroupWeightMap.get(key);
+        }
+
+        int totalWeight = 0;
+        long maxCurrent = Long.MIN_VALUE;
+        long now = System.currentTimeMillis();
+        Host selectedHost = null;
+        WeightedRoundRobin selectWeightRoundRobin = null;
+
+        for (Host host : hosts) {
+            String workGroupHost = host.getWorkGroup() + host.getAddress();
+            WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
+            int weight = host.getWeight();
+            if (weight < 0) {
+                weight = 0;
+            }
+
+            if (weightedRoundRobin == null) {
+                weightedRoundRobin = new WeightedRoundRobin();
+                // set weight
+                weightedRoundRobin.setWeight(weight);
+                map.putIfAbsent(workGroupHost, weightedRoundRobin);
+                weightedRoundRobin = map.get(workGroupHost);
+            }
+            if (weight != weightedRoundRobin.getWeight()) {
+                weightedRoundRobin.setWeight(weight);
+            }
+
+            long cur = weightedRoundRobin.increaseCurrent();
+            weightedRoundRobin.setLastUpdate(now);
+            if (cur > maxCurrent) {
+                maxCurrent = cur;
+                selectedHost = host;
+                selectWeightRoundRobin = weightedRoundRobin;
+            }
+
+            totalWeight += weight;
+        }
+
+
+        if (!updateLock.get() && hosts.size() != map.size() && 
updateLock.compareAndSet(false, true)) {
+            try {
+                ConcurrentMap<String, WeightedRoundRobin> newMap = new 
ConcurrentHashMap<>(map);
+                newMap.entrySet().removeIf(item -> now - 
item.getValue().getLastUpdate() > RECYCLE_PERIOD);
+                workGroupWeightMap.put(key, newMap);
+            } finally {
+                updateLock.set(false);
+            }
+        }
+
+        if (selectedHost != null) {
+            selectWeightRoundRobin.sel(totalWeight);
+            return selectedHost;
+        }
 
-        int size = source.size();
-        /**
-         * round robin
-         */
-        return (T) source.toArray()[index.getAndIncrement() % size];
+        return hosts.get(0);
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 2dedaf8..fa97403 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -49,6 +49,9 @@ public class WorkerConfig {
     @Value("${worker.listen.port: 1234}")
     private int listenPort;
 
+    @Value("${worker.weight:100}")
+    private int weight;
+
     public int getListenPort() {
         return listenPort;
     }
@@ -107,4 +110,13 @@ public class WorkerConfig {
     public void setWorkerMaxCpuloadAvg(int workerMaxCpuloadAvg) {
         this.workerMaxCpuloadAvg = workerMaxCpuloadAvg;
     }
+
+
+    public int getWeight() {
+        return weight;
+    }
+
+    public void setWeight(int weight) {
+        this.weight = weight;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 5e400e1..6204edc 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -16,9 +16,6 @@
  */
 package org.apache.dolphinscheduler.server.worker.registry;
 
-import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.SLASH;
-
 import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -44,9 +41,11 @@ import org.springframework.stereotype.Service;
 
 import com.google.common.collect.Sets;
 
+import static org.apache.dolphinscheduler.common.Constants.*;
+
 
 /**
- *  worker registry
+ * worker registry
  */
 @Service
 public class WorkerRegistry {
@@ -54,13 +53,13 @@ public class WorkerRegistry {
     private final Logger logger = 
LoggerFactory.getLogger(WorkerRegistry.class);
 
     /**
-     *  zookeeper registry center
+     * zookeeper registry center
      */
     @Autowired
     private ZookeeperRegistryCenter zookeeperRegistryCenter;
 
     /**
-     *  worker config
+     * worker config
      */
     @Autowired
     private WorkerConfig workerConfig;
@@ -86,7 +85,7 @@ public class WorkerRegistry {
     }
 
     /**
-     *  registry
+     * registry
      */
     public void registry() {
         String address = NetUtils.getHost();
@@ -122,7 +121,7 @@ public class WorkerRegistry {
     }
 
     /**
-     *  remove registry info
+     * remove registry info
      */
     public void unRegistry() {
         String address = getLocalAddress();
@@ -135,13 +134,14 @@ public class WorkerRegistry {
     }
 
     /**
-     *  get worker path
+     * get worker path
      */
     private Set<String> getWorkerZkPaths() {
         Set<String> workerZkPaths = Sets.newHashSet();
 
         String address = getLocalAddress();
         String workerZkPathPrefix = 
this.zookeeperRegistryCenter.getWorkerPath();
+        String weight = getWorkerWeight();
 
         for (String workGroup : this.workerGroups) {
             StringBuilder workerZkPathBuilder = new StringBuilder(100);
@@ -152,15 +152,23 @@ public class WorkerRegistry {
             // trim and lower case is need
             
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
             workerZkPathBuilder.append(address);
+            workerZkPathBuilder.append(weight).append(SLASH);
             workerZkPaths.add(workerZkPathBuilder.toString());
         }
         return workerZkPaths;
     }
 
     /**
-     *  get local address
+     * get local address
      */
     private String getLocalAddress() {
         return NetUtils.getHost() + ":" + workerConfig.getListenPort();
     }
+
+    /**
+     * get Worker Weight
+     */
+    private String getWorkerWeight() {
+        return ":" + workerConfig.getWeight();
+    }
 }
diff --git a/dolphinscheduler-server/src/main/resources/worker.properties 
b/dolphinscheduler-server/src/main/resources/worker.properties
index 0365c8a..9fba30c 100644
--- a/dolphinscheduler-server/src/main/resources/worker.properties
+++ b/dolphinscheduler-server/src/main/resources/worker.properties
@@ -32,3 +32,6 @@
 
 # default worker group
 #worker.groups=default
+
+# default worker weight
+#work.weight=100
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
index a14ea32..f25a227 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,16 +38,16 @@ public class RandomSelectorTest {
 
     @Test
     public void testSelect1(){
-        RandomSelector<String> selector = new RandomSelector();
-        String result = selector.select(Arrays.asList("1"));
-        Assert.assertTrue(StringUtils.isNotEmpty(result));
-        Assert.assertTrue(result.equalsIgnoreCase("1"));
+        RandomSelector selector = new RandomSelector();
+        Host result = selector.select(Arrays.asList(new 
Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20)));
+        Assert.assertNotNull(result);
     }
 
     @Test
     public void testSelect(){
-        RandomSelector<Integer> selector = new RandomSelector();
-        int result = selector.select(Arrays.asList(1,2,3,4,5,6,7));
-        Assert.assertTrue(result >= 1 && result <= 7);
+        RandomSelector selector = new RandomSelector();
+        Host result = selector.select(Arrays.asList(new 
Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20)));
+        Assert.assertNotNull(result);
+
     }
 }
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
index adc55a4..ed62caa 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
@@ -17,6 +17,7 @@
 package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
 
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.remote.utils.Host;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,26 +31,46 @@ import java.util.List;
 public class RoundRobinSelectorTest {
 
     @Test(expected = IllegalArgumentException.class)
-    public void testSelectWithIllegalArgumentException(){
+    public void testSelectWithIllegalArgumentException() {
         RoundRobinSelector selector = new RoundRobinSelector();
         selector.select(Collections.EMPTY_LIST);
     }
 
     @Test
-    public void testSelect1(){
-        RoundRobinSelector<String> selector = new RoundRobinSelector();
-        String result = selector.select(Arrays.asList("1"));
-        Assert.assertTrue(StringUtils.isNotEmpty(result));
-        Assert.assertTrue(result.equalsIgnoreCase("1"));
-    }
+    public void testSelect1() {
+        RoundRobinSelector selector = new RoundRobinSelector();
+        Host result = null;
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.2", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        // add new host
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.1", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.2", result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 
10, "kris")));
+        Assert.assertEquals("192.168.1.1",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 
10, "kris")));
+        Assert.assertEquals("192.168.1.3",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 
10, "kris")));
+        Assert.assertEquals("192.168.1.1",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 
10, "kris")));
+        Assert.assertEquals("192.168.1.2",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 
10, "kris")));
+        Assert.assertEquals("192.168.1.1",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 
10, "kris")));
+        Assert.assertEquals("192.168.1.3",result.getIp());
+        // remove host3
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.1",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.2",result.getIp());
+        result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, 
"kris"), new Host("192.168.1.2", 80, 10, "kris")));
+        Assert.assertEquals("192.168.1.1",result.getIp());
 
-    @Test
-    public void testSelect(){
-        RoundRobinSelector<Integer> selector = new RoundRobinSelector();
-        List<Integer> sources = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
-        int result = selector.select(sources);
-        Assert.assertTrue(result == 1);
-        int result2 = selector.select(Arrays.asList(1,2,3,4,5,6,7));
-        Assert.assertTrue(result2 == 2);
     }
+
 }

Reply via email to